Overview
The RPI Queue Reader service is used to drain Queue Listener and RPI Realtime queues. The following section documents how to set up the service.
Queue Listener setup
Queue listeners facilitate the monitoring of a "listener queue" for the arrival of data. Data arrives in the form of JSON packages—placed on the queue, for example, by an external system, or at submission of a web form. Downstream queue activities can then use this data to execute offers. Queue listeners might typically be used for the sending of emails, for example, after a customer makes a purchase, or when a web form is submitted in a landing page.
Refer to the RPI Reference Guide: Interaction Designer Queue Listener section for more details on queue listeners.
To configure RPI to use queue listeners:
-
Configure the listener queue provider in the Queue Listener Providers configuration interface.
-
Depending on your situation, use one of the following methods to update your Helm chart using the RPI Helm Assistant:
-
If you’re deploying RPI v7.7 for the first time, use the process outlined in the Initial deployment process section. In the Generate tab, scroll down to the Services section and locate Queue Reader.
-
If you’ve already deployed RPI v7.7 using the RPI Helm Assistant and want to add this functionality later on, you’ll be editing your
overrides.yamlfile directly as described in the Post-deployment update process section. In the Reference tab, you’ll find the relevant settings in thequeuereaderandrealtimeapisections.
-
Queue Reader settings
The Queue Reader is enabled by default. The following table explains what each of the settings means (realtimeConfiguration is described in the next section):
|
Setting |
Description |
|---|---|
|
|
Sets whether Queue Reader is enabled ( |
|
|
Sets the number of Kubernetes pod replicas (instances) to run for the RPI Queue Reader service. Increasing it runs more Queue Reader pods in parallel (which can increase throughput), subject to how you’ve configured the Queue Reader mode (e.g., distributed vs non-distributed) and any autoscaling you may have enabled. |
|
|
Pushes messages received that result in error (e.g., missing trigger key or malformed JSON) to a separate queue. The commented-out setting |
|
|
Pushes messages received for an inactive trigger to a separate queue. How long it takes for a trigger to be considered inactive is set by the commented-out setting |
|
|
Turns Horizontal Pod Autoscaling (HPA) on/off for the Queue Reader deployment.
|
Realtime Queue settings
Realtime event processing can be operated in two modes (set in queuereader.realtimeConfiguration.isDistributed):
-
Non-distributed mode: All work for a single queue is handled by a single service. There is no need for an external queue or cache to hold interim data. This is the default setting (
queuereader.realtimeConfiguration.isDistributed: false), and should be adequate for the vast majority of cases. If you’re using non-distributed mode, you do not need to configureinternalCacheandinternalQueues. -
Distributed mode: Facilitates more than one queue reader service draining the same queue. Allows for scaling to improve processing performance. Interim data is stored in an external (Redis) cache and internal RabbitMQ queue broker, which also protects against data loss. When setting
queuereader.realtimeConfiguration.isDistributed: true, you should also configureinternalCacheandinternalQueues.
The following table describes the settings you need to configure when using distributed mode:
|
Setting |
Description |
|---|---|
|
|
Internal cache for distributed processing. By default, this is set to |
|
|
Sets the following:
|
|
|
Internal queue for distributed processing. By default, this is set to |
|
|
Sets the following:
|
|
|
An array of GUIDs (RPI tenant/client IDs) that this Queue Reader instance/cluster will process messages for when using the Queue Reader’s internal queues setup. |
Callback Service setup
The Callback service processes events raised by from an external provider, and imports them into the data warehouse. The service uses the CallbackServiceProcessorNumReaders system configuration setting to facilitate parallel event processing, and reads events from the queue defined at the queueProvider.queueNames.callbackServiceQueuePath setting.
Events data is then ingested into a provider-specific RPI_[Provider]Events data warehouse table.
To use the Queue Reader service to process callback service messages, set isCallbackServiceProcessingEnabled to true (the default setting).
Operational endpoints
The following endpoints are available at port 8080 on the Queue Reader service:
-
Start:
/api/operations/start -
Status:
/api/operations/status -
Stop:
/api/operations/stop -
Execution Statistics:
/api/operations/stats