Kafka Agent¶
This agents target Kafka. It will: - Synchronisation of Kafka Topics - Synchronisation of Kafka Consumer groups - Scrape schema registries for schemas - Forwarding of metrics to SDM - Profile data (if enabled)
Configuration¶
agoora-endpoint: local.spoud.io
agoora-insecure: false
agoora-transport-path: /default/kafka
agoora:
# The scrapper is not on 100% of the time. It starts listening every {period} for a maximum of {max-samples} messages and {max-wait}
scrapper:
max-samples: 100
period: PT15M
profiling:
enabled: true
hooks:
enabled: true
kafka:
key:
secret:
topic-filter-regex: .+
consumer-group-filter-regex: .+
bootstrap-servers: kafka:9092
protocol: PLAINTEXT
key-store-location:
key-store-password:
trust-store-location:
trust-store-password:
registry:
confluent:
url:
api-key:
api-secret:
public-url:
property-templates:
kafka-topic:
kafka-consumer-group:
transport:
agoora-path: ${agoora-transport-path}
auth:
server-url: https://${agoora-endpoint}/auth/
realm: spoud
user:
name: to-be-defined
token: to-be-defined
logistics:
endpoint: ${agoora-endpoint}
insecure: ${agoora-insecure}
hooks:
endpoint: ${agoora-endpoint}
insecure: ${agoora-insecure}
schema:
endpoint: ${agoora-endpoint}
insecure: ${agoora-insecure}
looker:
endpoint: ${agoora-endpoint}
insecure: ${agoora-insecure}
blob:
endpoint: ${agoora-endpoint}
insecure: ${agoora-insecure}
profiler:
endpoint: localhost:8089
insecure: true
quarkus:
kafka:
snappy:
enabled: true
reactive-messaging:
health:
enabled: false
arc:
# Needed for the grpc interceptors
remove-unused-beans: fwk
http:
port: 8280
ssl:
native: true
native:
container-build: true
additional-build-args:
- --allow-incomplete-classpath
- --report-unsupported-elements-at-runtime
- -H:IncludeResourceBundles=com.sun.org.apache.xerces.internal.impl.msg.XMLMessages
log:
category:
"io.spoud.agoora":
level: INFO
"org.apache.kafka":
level: WARN
micrometer:
binder:
jvm: true
kafka:
enabled: true
grpc-client:
enabled: true
system: true
http-client:
enabled: true
mp-metrics:
enabled: true
rest-confluent-registry/mp-rest/url: ${agoora.registry.confluent.url}
#rest-confluent-registry/mp-rest/trustStore:
#rest-confluent-registry/mp-rest/trustStorePassword:
#rest-confluent-registry/mp-rest/trustStoreType: JKS
#rest-confluent-registry/mp-rest/keyStore: JKS
#rest-confluent-registry/mp-rest/keyStorePassword: JKS
#rest-confluent-registry/mp-rest/keyStoreType: JKS
"%dev":
agoora-endpoint: local.spoud.io
agoora-insecure: true
agoora-transport-path: /default/kafka/kafka
agoora:
property-templates:
kafka-topic: '{ "kafka.manager.url": "https://km.sdm.spoud.io/clusters/sdm/topics/{TOPIC_NAME}" }'
scrapper:
max-samples: 10
period: 2M
auth:
server-url: http://${agoora-endpoint}/auth/
user:
name: sdm_t_338908ea-30f7-4b73-aff9-7b2a08ccac7f
token: utQmxZZWC56aFfTrD8cTZSewPA574uTC
registry:
confluent:
url: http://kafka:8081
schema-cache:
schema-by-id: 1000
schema-expiration: 1H
topic-name-schema-id-cache: 5000
topic-name-id-expiration: 12H
quarkus:
log:
category:
"io.spoud.agoora":
level: INFO
SSL¶
If you want to use SSL with kafka, use those environment variables:
AGOORA_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
AGOORA_KAFKA_PROTOCOL=SSL
AGOORA_KAFKA_KEY-STORE_LOCATION=/data/keys/...
AGOORA_KAFKA_KEY-STORE_PASSWORD=123456
AGOORA_KAFKA_TRUST-STORE_LOCATION=/data/keys/...
AGOORA_KAFKA_TRUST-STORE_PASSWORD=123456
Kafka Permission Requirements¶
Minimal permissions required for sdm-kafka-agent:
DESCRIBE GROUP '*'
: read all consumer groups (needed for data subscriptions).DESCRIBE TOPIC '*'
: read all topics (needed for data offers)
Confluent Cloud (ccloud
):¶
ccloud kafka acl create --allow --service-account-id <account-id> --operation describe --topic '*'
ccloud kafka acl create --allow --service-account-id <account-id> --operation describe --consumer-group '*'
Configuration:
AGOORA_KAFKA_PROTOCOL=SASL_SSL
AGOORA_KAFKA_KEY=
AGOORA_KAFKA_SECRET=
Schema cache settings¶
The schema by id cache holds the full schema by id. This can become a big object in agents memory. The topic name and schema id cache holds the schema id by topic name. This is a smaller object in agents memory.
Configuration:
AGOORA_SCHEMA_CACHE_SCHEMA_BY_ID= number of schemas to cache by id default 1000
AGOORA_SCHEMA_CACHE_TOPIC_NAME_SCHEMA_ID_CACHE: number of schemas to cache by topic name and schema id default 1000
Kafka ACLs (bin/kafka-acls.sh
)¶
bin/kafka-acls.sh --add --allow-principal <principal> --operation Describe --topic '*'
bin/kafka-acls.sh --add --allow-principal <principal> --operation Describe --group '*'
Monitoring¶
host:8082/q/health
=> Global health endpointhost:8082/q/health/live
=> liveness probehost:8082/q/health/ready
=> readiness
Additional properties (deep dive tools)¶
You have the possibility to add properties to the Data Port and Data Subscription State. For this you have to use a JSON value. There is some variable that you can use. The variables are:
Variable | Description |
---|---|
TOPIC | Topic name |
TOPIC | Topic name |
RESOURCE_ID | Data port id or Data subscription state id |
Examples:
AGOORA_PROPERTY_TEMPLATES_KAFKA_TOPIC={"data.quality.dashboard":"https://grafana-dev.sdm.spoud.io/d/Zs9TGX7Mk/data-quality-detail?orgId=1&resourceId={RESOURCE_ID}&var-resourceId={RESOURCE_ID}","kafka.manager.url":"https://km.sdm.spoud.io/clusters/sdm/topics/{TOPIC_NAME}","prometheus.url" : "https://prometheus.sdm.spoud.io/graph?g0.range_input=1w&g0.expr=kafka_topic_highwater%7Btopic%3D%22{TOPIC_NAME}%22%7D&g0.tab=0"}
AGOORA_PROPERTY_TEMPLATES_KAFKA_CONSUMER_GROUP={"kafka.manager.url":"https://km.sdm.spoud.io/clusters/sdm/consumers/{CONSUMER_GROUP_NAME}/topic/{TOPIC_NAME}/type/KF"}