I have Flink DataStream job that flows like this:
- before DataStream definition
- define list of Kafka Topics
- create Hashmap of string KafkaTopic -> SideOutput
- define Kafka Source
- during DataStream definition
- add Kafka Source to env using fromSource
- define Process function that pulls the kafka topic from the message metadata, looks up the SideOutput from the Hashmap of (string KafkaTopic -> SideOutput) and outputs the message to the output tag returned from the Hashmap
- after DataStream definition
- iterate over Hashmap of string KafkaTopic -> SideOutput
- create JDBC sink with unique insert statement for each sideoutput
- add a sink to each side OutputTag
I'd like to avoid building the list of KafkaTopics before defining the stream. I want to use the regex function of the KafkaSource to consume开发者_如何学Python from all topics that match the pattern.
Is it possible to create new side Output tags and Sinks at runtime during the process function? If I encounter a new kafka topic create a sideoutput, add it to the stream, then add a new sink to the sideoutput?
The more I think about it, I assume this is not possible.
My alternative plan is to use a kafka connector and build a list of kafka topics in the 'before DataStream definition' step from above. In that case I would have to restart the job to consume from new topics.
I could have thousands of topics, which is why I want to dynamically define them.
精彩评论