Staying up to date
Problem
You have a local service you need to keep synchronized with the Entur services.
Solution
Synchronization from your service to Entur services is handled via the API. You can update directly on the customer or the given consent.
Synchronization from Entur services to your systems happens via topic subscription.
Entur exposes several different topics related to customers:
Environment | Staging | Production |
---|---|---|
Customer changes | customer-changed-staging-$Org | customer-changed-production-$Org |
Given Consent changes | given-consent-changed-staging-$Org | given-consent-changed-production-$Org |
Consent changes | consent-changed-staging-$Org | consent-changed-production-$Org |
Consent Base changes | consent-base-changed-staging-$Org | consent-base-changed-production-$Org |
Contract changes | contract-change-event-staging-$Org | contract-change-event-production-$Org |
Loyalty Program changes | loyaltyprogram-change-event-staging-$Org | loyaltyprogram-change-event-production-$Org |
($Org is replaced with your assigned organisation Id, so the topic becomes for instance given-consent-changed-staging-35)
The CustomerChange and GivenConsentChange objects are as small and GDPR friendly as we can make them. Use the customerNumbers to look up in the API after receiving them. The other change events give a complete image of the object after each change.
The elements you fetch from these queues are slightly different:
An example client for consuming these events:
val properties = Properties()
properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "https://kafka-0.entur.io:9095,https://kafka-1.entur.io:9095,https://kafka-2.entur.io:9095"
properties[ConsumerConfig.GROUP_ID_CONFIG] = "teamculp-test-1"
properties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
properties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java
properties["specific.avro.reader"] = true
properties["schema.registry.url"] = "http://schema-registry.entur.io:8001"
properties[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
properties[SaslConfigs.SASL_MECHANISM] = "SCRAM-SHA-512"
properties[SaslConfigs.SASL_JAAS_CONFIG] = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required\nusername=\"%s\"\npassword=\"%s\";", "username", "password")
val consumer: Consumer<String, CustomerChange> = KafkaConsumer(properties)
consumer.subscribe(asList("customer-changed-staging-...."))
val records = consumer.poll(Duration.of(10, ChronoUnit.SECONDS))
log.info("records fetched: {}", records.count())
if (!records.isEmpty)
for (record in records.records("customer-changed-staging-....")) {
log.info("record={}", record)
}
Note:
- for the customer services, we are using StringDeserializer for the keys.
- Entur uses LetsEncrypt to sign our certificates. You should use the truststore that is bundeled with your JDK/JRE
- You will receive the correct urls, username and password from Entur.
- GROUP_ID_CONFIG must be the same on all clients in a cluster. It should be set explicitly. Also, use another group Id when testing.