Enable the Matrix to Rocket.Chat interaction
You’ve completed one directional flow to deliver customer messages from Rocket.Chat to agents in Matrix. Now, you need to transfer agent responses in Matrix, back to customers in Rocket.Chat.
As previously pointed out, Camel’s collection of components does not include one for Matrix. Matrix offers a feature rich client-server API. The API is built around the notion of events, which describe something that has happened on the platform, such as the creation of a room, a user joining a room etc… The sync
method of said API synchronizes the client’s state with the latest state on the server. By calling the sync
API in a loop, the client (our Camel integration) can subscribe to events and act accordingly.
For simplicity, this part of the Matrix integration is already implemented. As mentioned, it calls the sync
API in a loop, filters for events we are interested in (room leave events and room message events), and forwards the event to a Camel route.
1. Implement the Agent to Client flow
The listener described above is responsible to pick up agent messages posted in Matrix and direct then to the Camel route you need to implement to process the event.
In essence, our route needs to obtain from cache the context for this particular customer/agent conversation, prepare the JSON data containing the agent’s answer, and send it to the AMQ broker. The Rocket.chat integration will consume the event and deliver it to the customer.
What will I learn?
In the content that follows you will learn the following concepts:
|
Ensure you’ve stopped your dev instance from the test in the previous section. If not stopped yet, from your terminal press Ctrl +C to stop it.
|
Start your implementation:
-
From your Dev Spaces terminal, execute the
kamel
command below to create a new source file to process Matrix events:kamel init routes-from-matrix-main.yaml
The new file has a YAML extension. Camel K automatically generates for you a skeleton using the YAML DSL (Domain Specific Language). -
Open the
routes-from-matrix-main.yaml
file in your editor. -
Delete the example route (full
from
definition) -
Replace (the deleted route) with the following snippet:
# - route: from: uri: "direct:process-agent-message" # <1> steps: - setProperty: # <2> name: text simple: ${body.get(text)} - setProperty: # <2> name: agent simple: ${body.get(user)} - setProperty: # <2> name: key simple: ${body.get(room)} - to: uri: "direct:cache-get" # <3> - choice: when: - simple: ${body} != null # <4> steps: - to: uri: "language:simple:${body.replace(text,${exchangeProperty.text})}" # <5> parameters: transform: false - to: uri: "language:simple:${body.put(agent,${exchangeProperty.agent})}" # <5> parameters: transform: false - setProperty: # <6> name: source simple: ${body.get(source).get(uname)} - marshal: # <7> json: {} - toD: uri: "amqp:topic:support.${exchangeProperty.source}" # <8> parameters: connectionFactory: "#myFactory" - setBody: simple: '${exchangeProperty.agent}: ${exchangeProperty.text}' # <9> - removeHeaders: pattern: "*" - toD: uri: kafka:support.${env.NAMESPACE}.matrix${exchangeProperty.key.replace(":","-").replace("!","-")} otherwise: # <10> steps: - log: "no cache entry, ignoring message from user: ${exchangeProperty.agent}" #
Click here for details of the above route
1 Defines the from
element with thedirect
component to allow other Camel routes invoke it.2 Keeps necessary values (as properties) from Matrix’s event. The Matrix JSON event has already been un-marshalled for you.
3 Fetches from the cache system the customer/agent context We use Matrix's
room key`
as our key to fetch the cache entry.4 Evaluates if the cache entry exists with a choice
.-
if true, it executes [5] to [9]
-
if false, it executes the
otherwise
block [10]
5 When true, the cache payload is recycled, it updates the text field to contain the agent’s answer and also injects the agent’s name. There are many strategies in Camel to manipulate data. For minor changes on payloads the
language
component is very handy.6 Obtains from the cache entry the uname
(customer’s unique name) which is necessary to route the event to the right destination.7 Marshals the Java Map in JSON. 8 Sends the event over AMQP to the AMQ Broker. the call uses
toD
(Dynamicto
) to evaluate at runtime the target AMQP address using thesource
property.The
amqp
component requires no extra parameters because it has been pre-configured for you, it’s secured with TLS and Scram, and points to the shared environment’s AMQ Broker.9 Finally, the interaction is recorded and streamed to Kafka -
a payload in the format
agent: text
is prepared using Camel’ssimple
expression -
pushes the message to Kafka.
-
Note the Kafka topic defined uses your
NAMESPACE
, again to prevent clashes with other students since you all share the same Kafka cluster. -
The
kafka
component requires no extra parameters because it has been pre-configured for you, it’s secured with TLS and Scram, and points to the shared environment’s Kafka cluster.
-
10 Lastly, when a cache entry does not exist, we ignore it. This is necessary in our lab to prevent other students from interfering with your tests. In a real-world implementation, you would perform the check anyway for robust error handling.
-
2. Implement the room leave event.
A crucial phase of the customer/agent interaction is when both parts agree on closing the conversation. At that point the expected sequence of actions is the following:
-
The agent manually leaves the room in Matrix
-
The customer receives a notification indicating the conversation has been closed.
When the agent leaves the room, Matrix fires a room leave event, which our listener picks up and directs to a route called process-room-leave-event
Let’s implement the logic required which is very similar to our previously defined route
Include in the same (routes-from-matrix-main.yaml) YAML file (copy and paste) the snippet below:
#
- route:
from:
uri: "direct:process-room-leave-event"
steps:
- log:
message: ${body}
- setProperty:
name: key
simple: ${body.get(room)}
- setProperty:
name: agent
simple: ${body.get(user)}
- to:
uri: "direct:cache-get" # <1>
- choice:
when:
- simple: ${body} != null
steps:
- to:
uri: "language:simple:${body.replace(text,'your session ended, conversation is now closed.')}" # <2>
parameters:
transform: false
- to:
uri: "language:simple:${body.put(agent,'support')}" # <2>
parameters:
transform: false
- setProperty:
name: source
simple: ${body.get(source).get(uname)}
- setProperty:
name: key-rocketchat
simple: ${body.get(source).get(room)}-${body.get(user)}
- setProperty:
name: kafka-client
simple: matrix${body.get(target).get(room).replace(":","-").replace("!","-")}
- marshal:
json: {}
- setProperty:
name: context
simple: ${bodyAs(String)}
- toD:
uri: "amqp:topic:support.${exchangeProperty.source}" # <3>
parameters:
connectionFactory: "#myFactory"
- to:
uri: "direct:cache-remove" # <4>
- setProperty:
name: key
simple: ${exchangeProperty.key-rocketchat}
- to:
uri: "direct:cache-remove" # <5>
- setBody:
simple: done # <6>
- removeHeaders:
pattern: "*"
- setHeader:
name: context
simple: ${exchangeProperty.context} # <6>
- toD:
uri: kafka:support.${env.NAMESPACE}.${exchangeProperty.kafka-client} # <7>
- setBody:
simple: ${exchangeProperty.kafka-client}
- toD:
uri: "kafka:support.${env.NAMESPACE}.closed" # <8>
otherwise:
steps:
- log: no cache entry, ignoring message
You will observe the route above is almost identical to the previous one.
Click here to view a summary of the differences
1 | It also fetches from the cache system the customer/agent context. | ||||
2 | It sends the closing event via AMQP, and proceeds [4] & [5] to delete the two cache entries relevant to this conversation:
|
||||
3 | It deletes the cache entry with source identifier (Rocket.Chat). | ||||
4 | It deletes the cache entry with target identifier (Natrix). | ||||
5 | Finally, it prepares body and headers to send two closure Kafka events [7] & [8]. | ||||
6 | The first event to Kafka contains the context information, sent to the conversation topic. | ||||
7 | The second one is signal event, a notification that allows other applications to react. |
You have completed the return processing flow of messages from agents (in Matrix’s Element) to customers (in Rocket.Chat). Next, deploy your integration in OpenShift and send some messages to validate it.
3. Deploy and test your code
With the Camel K client kamel
you can deploy your integrations with one command. Camel K will take care of collecting all your sources, containerizing them and deploying an instance.
Let’s deploy your code .
-
From your terminal, execute the following command:
./deploy.sh
The deploy.sh
scripts executes akamel run
command that defines all the necessary support resources and parameters to run your integration.Outputmatrix (main) $ ./deploy.sh No IntegrationPlatform resource in globex-camel-{user_name} namespace Integration "matrix" created
You can ignore any warning message stating "Unable to verify existence of operator id [camel-k] due to lack of user privileges" -
You can inspect the logs by running the following command:
kamel log matrix
If you encounter errors or unexpected results, you might have missed a step following the instructions or done some other human error.
If so, try again using the prebuilt code by running the following command. This code does the exact same logic that you implemented in the above steps../safe-deploy.sh
-
From Matrix’s Element application:
-
Click on the newly created channel
rocketchat-{user_name}
to display the messages. -
Type a message, for example:
-
My name is Bruno, how can I help you today?
and send it.
-
-
-
From Rocket.Chat…
You should see the agent’s message sent from Matrix appear in the Rocket.Chat channel.
-
Exchange a few more messages to simulate a conversation.
-
Then, from Matrix’s Element chat window, to close the session, follow these steps, as per the illustration below:
-
Right click on the room
rocketchat-{user_name}
-
Click
Leave
-
Confirm your action to leave the room.
-
-
In Rocket.Chat, as above on the right hand side, you should see a notification informing the session has ended.
Well done, you’ve completed the full integration, both ways, between Rocket.Chat and Matrix.
In contrast with running in DEV mode, the deploy.sh
command made the Camel K operator to fully deploy your code in an OpenShift pod named matrix, which you can see running from the Topology view.
You can also use the kamel
client from your terminal to obtain information about your deployed Camel K instances:
kamel get
No IntegrationPlatform resource in globex-camel-{user_name} namespace NAME PHASE KIT matrix Running globex-camel-{user_name}/kit-chcc8ts5v3ov25mqg460