Skip to content

Kafka Agent

This agents target Kafka. It will: - Synchronisation of Kafka Topics - Synchronisation of Kafka Consumer groups - Scrape schema registries for schemas - Forwarding of metrics to SDM - Profile data (if enabled)

Configuration

agoora-endpoint: local.spoud.io
agoora-insecure: false
agoora-transport-path: /default/kafka

agoora:
  # The scrapper is not on 100% of the time. It starts listening every {period} for a maximum of {max-samples} messages and {max-wait}
  scrapper:
    max-samples: 100
    period: PT15M
    profiling:
      enabled: true
    hooks:
      enabled: true
  kafka:
    key:
    secret:
    topic-filter-regex: .+
    consumer-group-filter-regex: .+
    bootstrap-servers: kafka:9092
    protocol: PLAINTEXT
    key-store-location:
    key-store-password:
    trust-store-location:
    trust-store-password:
  registry:
    confluent:
      url:
      api-key:
      api-secret:
      public-url:
  property-templates:
    kafka-topic:
    kafka-consumer-group:
  transport:
    agoora-path: ${agoora-transport-path}
  auth:
    server-url: https://${agoora-endpoint}/auth/
    realm: spoud
    user:
      name: to-be-defined
      token: to-be-defined
  logistics:
    endpoint: ${agoora-endpoint}
    insecure: ${agoora-insecure}
  hooks:
    endpoint: ${agoora-endpoint}
    insecure: ${agoora-insecure}
  schema:
    endpoint: ${agoora-endpoint}
    insecure: ${agoora-insecure}
  looker:
    endpoint: ${agoora-endpoint}
    insecure: ${agoora-insecure}
  blob:
    endpoint: ${agoora-endpoint}
    insecure: ${agoora-insecure}
  profiler:
    endpoint: localhost:8089
    insecure: true

quarkus:
  kafka:
    snappy:
      enabled: true
  reactive-messaging:
    health:
      enabled: false
  arc:
    # Needed for the grpc interceptors
    remove-unused-beans: fwk
  http:
    port: 8280
  ssl:
    native: true
  native:
    container-build: true
    additional-build-args:
      - --allow-incomplete-classpath
      - --report-unsupported-elements-at-runtime
      - -H:IncludeResourceBundles=com.sun.org.apache.xerces.internal.impl.msg.XMLMessages
  log:
    category:
      "io.spoud.agoora":
        level: INFO
      "org.apache.kafka":
        level: WARN


  micrometer:
    binder:
      jvm: true
      kafka:
        enabled: true
      grpc-client:
        enabled: true
      system: true
      http-client:
        enabled: true
      mp-metrics:
        enabled: true


rest-confluent-registry/mp-rest/url: ${agoora.registry.confluent.url}
#rest-confluent-registry/mp-rest/trustStore:
#rest-confluent-registry/mp-rest/trustStorePassword:
#rest-confluent-registry/mp-rest/trustStoreType: JKS
#rest-confluent-registry/mp-rest/keyStore: JKS
#rest-confluent-registry/mp-rest/keyStorePassword: JKS
#rest-confluent-registry/mp-rest/keyStoreType: JKS

"%dev":
  agoora-endpoint: local.spoud.io
  agoora-insecure: true
  agoora-transport-path: /default/kafka/kafka
  agoora:
    property-templates:
      kafka-topic: '{ "kafka.manager.url": "https://km.sdm.spoud.io/clusters/sdm/topics/{TOPIC_NAME}" }'
    scrapper:
      max-samples: 10
      period: 2M
    auth:
      server-url: http://${agoora-endpoint}/auth/
      user:
        name: sdm_t_338908ea-30f7-4b73-aff9-7b2a08ccac7f
        token: utQmxZZWC56aFfTrD8cTZSewPA574uTC
    registry:
      confluent:
        url: http://kafka:8081
    schema-cache:
      schema-by-id: 1000
      schema-expiration: 1H
      topic-name-schema-id-cache: 5000
      topic-name-id-expiration: 12H


  quarkus:
    log:
      category:
        "io.spoud.agoora":
          level: INFO

SSL

If you want to use SSL with kafka, use those environment variables:

AGOORA_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
AGOORA_KAFKA_PROTOCOL=SSL
AGOORA_KAFKA_KEY-STORE_LOCATION=/data/keys/...
AGOORA_KAFKA_KEY-STORE_PASSWORD=123456
AGOORA_KAFKA_TRUST-STORE_LOCATION=/data/keys/...
AGOORA_KAFKA_TRUST-STORE_PASSWORD=123456

Kafka Permission Requirements

Minimal permissions required for sdm-kafka-agent:

  • DESCRIBE GROUP '*': read all consumer groups (needed for data subscriptions).
  • DESCRIBE TOPIC '*': read all topics (needed for data offers)

Confluent Cloud (ccloud):

ccloud kafka acl create --allow --service-account-id <account-id> --operation describe --topic '*'
ccloud kafka acl create --allow --service-account-id <account-id> --operation describe --consumer-group '*'

Configuration:

AGOORA_KAFKA_PROTOCOL=SASL_SSL
AGOORA_KAFKA_KEY=
AGOORA_KAFKA_SECRET=

Schema cache settings

The schema by id cache holds the full schema by id. This can become a big object in agents memory. The topic name and schema id cache holds the schema id by topic name. This is a smaller object in agents memory.

Configuration:

AGOORA_SCHEMA_CACHE_SCHEMA_BY_ID= number of schemas to cache by id default 1000
AGOORA_SCHEMA_CACHE_TOPIC_NAME_SCHEMA_ID_CACHE: number of schemas to cache by topic name and schema id default 1000

Kafka ACLs (bin/kafka-acls.sh)

bin/kafka-acls.sh --add --allow-principal <principal> --operation Describe --topic '*'
bin/kafka-acls.sh --add --allow-principal <principal> --operation Describe --group '*'

Monitoring

  • host:8082/q/health => Global health endpoint
  • host:8082/q/health/live => liveness probe
  • host:8082/q/health/ready => readiness

Additional properties (deep dive tools)

You have the possibility to add properties to the Data Port and Data Subscription State. For this you have to use a JSON value. There is some variable that you can use. The variables are:

Variable Description
TOPIC Topic name
TOPIC Topic name
RESOURCE_ID Data port id or Data subscription state id

Examples:

AGOORA_PROPERTY_TEMPLATES_KAFKA_TOPIC={"data.quality.dashboard":"https://grafana-dev.sdm.spoud.io/d/Zs9TGX7Mk/data-quality-detail?orgId=1&resourceId={RESOURCE_ID}&var-resourceId={RESOURCE_ID}","kafka.manager.url":"https://km.sdm.spoud.io/clusters/sdm/topics/{TOPIC_NAME}","prometheus.url" : "https://prometheus.sdm.spoud.io/graph?g0.range_input=1w&g0.expr=kafka_topic_highwater%7Btopic%3D%22{TOPIC_NAME}%22%7D&g0.tab=0"}
AGOORA_PROPERTY_TEMPLATES_KAFKA_CONSUMER_GROUP={"kafka.manager.url":"https://km.sdm.spoud.io/clusters/sdm/consumers/{CONSUMER_GROUP_NAME}/topic/{TOPIC_NAME}/type/KF"}