Staying up to date

Problem

You have a local service you need to keep synchronized with the Entur services.

Solution

Flow chart

Synchronization from your service to Entur services is handled via the API. You can update directly on the customer or the given consent.

Synchronization from Entur services to your systems happens via topic subscription.

Entur exposes several different topics related to customers:

EnvironmentStagingProduction
Customer changescustomer-changed-staging-$Orgcustomer-changed-production-$Org
Given Consent changesgiven-consent-changed-staging-$Orggiven-consent-changed-production-$Org
Consent changesconsent-changed-staging-$Orgconsent-changed-production-$Org
Consent Base changesconsent-base-changed-staging-$Orgconsent-base-changed-production-$Org
Contract changescontract-change-event-staging-$Orgcontract-change-event-production-$Org
Loyalty Program changesloyaltyprogram-change-event-staging-$Orgloyaltyprogram-change-event-production-$Org

($Org is replaced with your assigned organisation Id, so the topic becomes for instance given-consent-changed-staging-35)

The CustomerChange and GivenConsentChange objects are as small and GDPR friendly as we can make them. Use the customerNumbers to look up in the API after receiving them. The other change events give a complete image of the object after each change.

The elements you fetch from these queues are slightly different:

An example client for consuming these events:

val properties = Properties()
properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "https://kafka-0.entur.io:9095,https://kafka-1.entur.io:9095,https://kafka-2.entur.io:9095"
properties[ConsumerConfig.GROUP_ID_CONFIG] = "teamculp-test-1"
properties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java
properties["specific.avro.reader"] = true
properties["schema.registry.url"] = "http://schema-registry.entur.io:8001"

properties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"

properties[SaslConfigs.SASL_MECHANISM] = "SCRAM-SHA-512"
properties[SaslConfigs.SASL_JAAS_CONFIG] = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required\nusername=\"%s\"\npassword=\"%s\";", "username", "password")

val consumer: Consumer<String, CustomerChange> = KafkaConsumer(properties)

consumer.subscribe(asList("customer-changed-staging-...."))
val records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS))

log.info("records fetched: {}", records.count())

if (!records.isEmpty)
    for (record in records.records("customer-changed-staging-....")) {
        log.info("record={}", record)
    }

Note:

  • for the customer services, we are using StringDeserializer for the keys.
  • Entur uses LetsEncrypt to sign our certificates. You should use the truststore that is bundeled with your JDK/JRE
  • You will receive the correct urls, username and password from Entur.
  • GROUP_ID_CONFIG must be the same on all clients in a cluster. It should be set explicitly. Also, use another group Id when testing.