开发者

FLINK left join three DataStreams using cogroup

开发者 https://www.devze.com 2022-12-07 17:17 出处:网络
I\'m trying to merge three stream into a single stream. Tried union but was unable to proceed as the schema\'s are different and if I merge the schema it becomes too large.

I'm trying to merge three stream into a single stream. Tried union but was unable to proceed as the schema's are different and if I merge the schema it becomes too large.

So, I'm using **cogroup ** to do a left join and return a tuple of three streams.

`
DataStream<Tuple3<Schema1, Schema2, Schema3>> mergeJoin = stream1.coGroup(stream2)
        .where(genericRecord -> {return   SchemaUtils.getKey1(genericRecord);})
        .equalTo(genericRecord -> {return   SchemaUtils.getKey2(genericRecord);})
        .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
        .apply(new LeftOuterJoin1())
        .join(stream3)
        .where(tuple_Schema1_Schema2->{return   SchemaUtils.getKey1(tuple_Schema1_Schema2);})
        .equalTo( genericRecord->{ return SchemaUtils.getKey3(genericRecord);})
        .window(TumblingProcessingTimeWindows.of(Time.seconds(6)))
        .apply(new LeftOuterJoin2());
`
public static class LeftOuterJoin2 implements CoGroupFunction<Tuple2<schema1, schema2>, GenericRecord, Tuple3<chema1, schema2, schema3>> {

        @Override
        public void 开发者_Python百科coGroup(Iterable<Tuple2<SCHEMA1, SCHEMA2>> iterable, Iterable<GenericRecord> iterable1, Collector<Tuple3<SCHEMA1, SCHEMA2, SCHEMA3>> collector) throws Exception {
            final SCHEMA3 NULL_ELEMENT = null;
            ObjectMapper mapper = new ObjectMapper();
            for (Tuple2<SCHEMA1, SCHEMA2> leftElem : iterable) {
                boolean hadElements = false;
                for (GenericRecord rightElem : iterable1) {
                    SCHEMA1 schema1_data = leftElem.f0;
                    SCHEMA2 schema2_data = leftElem.f1;
                    SCHEMA3 schema3_data = mapper.readValue(rightElem.toString(), SCHEMA3.class);
                    collector.collect(new Tuple3<>(schema1_data, schema2_data,schema3_data));
                    hadElements = true;
                }
                if (!hadElements) {
                    SCHEMA1 schema1_data = leftElem.f0;
                    SCHEMA2 schema2_data = leftElem.f1;
                    collector.collect(new Tuple3<>(schema1_data, schema2_data,NULL_ELEMENT));
                }
            }
        }
    }


While merging the first two streams of tuple with the third on **apply(new LeftOuterJoin2())** \ i'm getting 
> 
> Cannot resolve method 'apply(LeftOuterJoin2)' error.

Expecting1: A tuple of three streams.
Expecting2: Number of records should be equal to SCHEMA1 count.
0

精彩评论

暂无评论...
验证码 换一张
取 消