Skip to content

How does one create kafka streams setup to read AVRO encoded messages? #375

@StankovicMarko

Description

@StankovicMarko

Hi,

I've been trying to use streams to be able to read messages from kafka topic that has AVRO encoded messages, AVRO schemas are in AWS Glue.

I've tried many variations of configuration below but failed to get any progress

(defn build-topology [builder input-topics]
  (let [topic-maps (map topic-config input-topics)]
    (-> (jstreams/kstreams builder topic-maps)
       (jstreams/peek forward ONLY PRN MSG)))
  builder)
(mount/defstate kafka
  :start (let [builder (jstreams/streams-builder)
               topology (build-topology builder ["test"])
               application-config {"bootstrap.servers" (:kafka-servers config/config)
                                   "consumer.auto.offset.reset" "latest"
                                   "application.id" (str (:profile (mount/args)) "-match-status-" (random-uuid))
                                   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                                   "value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
                                   "avroRecordType" "GENERIC_RECORD"
                                   "registry.name" "test"
                                   "schemaName" "test"}
               application (jstreams/kafka-streams
                            topology
                            application-config)]
           (timbre/info "Starting Kafka Streams application..." {:application-config application-config})
           (jstreams/start application)

           {:application application
            :application-config application-config
            :builder builder
            :topology topology})

  :stop (do (jstreams/close (:application kafka))
            {}))

I've setup basic consumer to make sure i am not losing my sanity and this works.

(def consumer (jc/subscribed-consumer {"bootstrap.servers" (:kafka-servers config/config)
                                         "consumer.auto.offset.reset" "latest"
                                         "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
                                         "value.deserializer" "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer"
                                         "avroRecordType" "GENERIC_RECORD"
                                         "registry.name" "test123"
                                         "group.id" "test123"
                                         "max.poll.records" (.intValue 1)
                                         "schemaName" "test123"}
                                 [{:topic-name "test123"}]))
(.close consumer)



(jc/poll consumer 1000)            

I think using streams for avro encoded glue is possible but my lack of skill/knowledge is showing.
Any guidance would be appreciated.

Just wanna mention that i tried asking for help in slack and didn't get response for few days now that's why i am opening an issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions