Redpoint Interaction v7.x Documentation

Admin: Queue Reader setup

Overview

The RPI Queue Reader service is used to drain Queue Listener and RPI Realtime queues. The following section documents how to set up the service.

Queue Listener setup

Queue listeners facilitate the monitoring of a "listener queue" for the arrival of data. Data arrives in the form of JSON packages—placed on the queue, for example, by an external system, or at submission of a web form. Downstream queue activities can then use this data to execute offers. Queue listeners might typically be used for the sending of emails, for example, after a customer makes a purchase, or when a web form is submitted in a landing page.

Refer to the RPI Reference Guide: Interaction Designer Queue Listener section for more details on queue listeners. 

To configure RPI to use queue listeners:

  1. Configure the listener queue provider in the Queue Listener Providers configuration interface. 

  2. Depending on your situation, use one of the following methods to update your Helm chart using the RPI Helm Assistant:

    • If you’re deploying RPI v7.7 for the first time, use the process outlined in the Initial deployment process section. In the Generate tab, scroll down to the Services section and locate Queue Reader.

    • If you’ve already deployed RPI v7.7 using the RPI Helm Assistant and want to add this functionality later on, you’ll be editing your overrides.yaml file directly as described in the Post-deployment update process section. In the Reference tab, you’ll find the relevant settings in the queuereader and realtimeapi sections.

Queue Reader settings

The Queue Reader is enabled by default. The following table explains what each of the settings means (realtimeConfiguration is described in the next section):

Setting

Description

queuereader.enabled

Sets whether Queue Reader is enabled (true) or not (false). It is enabled (true) by default.

queuereader.replicas

Sets the number of Kubernetes pod replicas (instances) to run for the RPI Queue Reader service.

Increasing it runs more Queue Reader pods in parallel (which can increase throughput), subject to how you’ve configured the Queue Reader mode (e.g., distributed vs non-distributed) and any autoscaling you may have enabled.

queuereader.errorQueuePath

Pushes messages received that result in error (e.g., missing trigger key or malformed JSON) to a separate queue. The commented-out setting listenerQueueErrorTTLDays (default = 14 days) controls how many days to retain (TTL) messages that the Queue Reader routes to the Queue Listener “error” queue (i.e., messages that failed processing). After this many days, the message is eligible to expire/purge according to the underlying queue provider’s TTL support.

queuereader.nonActiveQueuePath

Pushes messages received for an inactive trigger to a separate queue. How long it takes for a trigger to be considered inactive is set by the commented-out setting listenerQueueNonActiveTTLDays; the default is 14 days.

queuereader.autoscaling.enabled

Turns Horizontal Pod Autoscaling (HPA) on/off for the Queue Reader deployment.

  • true: the chart creates an HPA so Kubernetes can automatically scale the number of queuereader pods based on the autoscaling targets you configure (for example queuereader.autoscaling.minReplicas, maxReplicas, and metrics).

  • false (default): no HPA is created; the Queue Reader runs at the fixed replica count set by queuereader.replicas.

Realtime Queue settings

Realtime event processing can be operated in two modes (set in queuereader.realtimeConfiguration.isDistributed):

  • Non-distributed mode: All work for a single queue is handled by a single service. There is no need for an external queue or cache to hold interim data. This is the default setting (queuereader.realtimeConfiguration.isDistributed: false), and should be adequate for the vast majority of cases. If you’re using non-distributed mode, you do not need to configure internalCache and internalQueues.

  • Distributed mode: Facilitates more than one queue reader service draining the same queue. Allows for scaling to improve processing performance. Interim data is stored in an external (Redis) cache and internal RabbitMQ queue broker, which also protects against data loss. When setting
    queuereader.realtimeConfiguration.isDistributed: true, you should also configure internalCache and internalQueues.

The following table describes the settings you need to configure when using distributed mode:

Setting

Description

queuereader.internalCache.provider

Internal cache for distributed processing. By default, this is set to redis, which is currently the only supported cache provider for this scenario.

queuereader.internalCache.type

Sets the following:

  • internal: Use the chart-managed Redis

  • external: Use your own Redis instance; if using external, also set redisSettings.connectionString

queuereader.internalQueues.provider

Internal queue for distributed processing. By default, this is set to rabbitmq, which is currently the only supported internal queue provider for this scenario.

queuereader.internalQueues.type

Sets the following:

  • internal: Use the chart-managed RabbitMQ

  • external: Use your own RabbitMQ instance; if using external, also set internalQueuesrabbitmqSettings

queuereader.internalQueues.tenantIds

An array of GUIDs (RPI tenant/client IDs) that this Queue Reader instance/cluster will process messages for when using the Queue Reader’s internal queues setup.

Callback Service setup

The Callback service processes events raised by from an external provider, and imports them into the data warehouse. The service uses the CallbackServiceProcessorNumReaders system configuration setting to facilitate parallel event processing, and reads events from the queue defined at the queueProvider.queueNames.callbackServiceQueuePath setting.

Events data is then ingested into a provider-specific RPI_[Provider]Events data warehouse table.

To use the Queue Reader service to process callback service messages, set isCallbackServiceProcessingEnabled to true (the default setting).

Operational endpoints

The following endpoints are available at port 8080 on the Queue Reader service:

  • Start: /api/operations/start

  • Status: /api/operations/status

  • Stop: /api/operations/stop

  • Execution Statistics: /api/operations/stats