Flink-使用合流操作进行实时对账需求的实现-创新互联

学Flink第八章多流转换的时候,进行合流操作.connect()使用到了第九章状态编程的知识,感觉总体不是很清晰,因此学完状态编程后现在进行重温并细化一些细节

创新互联公司专注于中大型企业的网站设计、成都网站设计和网站改版、网站营销服务,追求商业策划与数据分析、创意艺术与技术开发的融合,累计客户超过千家,服务满意度达97%。帮助广大客户顺利对接上互联网浪潮,准确优选出符合自己需要的互联网运用,我们将一直专注品牌网站制作和互联网程序开发,在前进的路上,与客户一起成长!
  1. 业务背景

在这里插入图片描述

  • 步骤一:

用户进行支付的时候,后台是需要调用第三方服务平台进行服务,即用户支付请求,页面将会跳转到第三方支付平台支付

  • 步骤二:

用户进行支付之后,第三方支付平台给到用户前端支出反馈,并且给我们平台发送用户已经付款的消息

  • 步骤三:

第三方支付平台需要将钱再转入到我们平台账户

  1. 出现的问题以及需求
  • 问题

如果进行到图中④,如果发生数据丢失,那么用户已经支付的消息无法传达给到后台,而后不能关闭订单

  • 需求

因此需要进行实时对账操作,即用户提交的支付请求(客户端),以及第三方支付平台给到的请求(三方端),两者可以当成两条流

  • 结果

如果进行两条流的操作后不匹配,那么将进行预警

  1. 一些细节考虑
  • 两个流都给他标上时间戳(使用watermark标志)

  • 使用状态编程保存状态以及设置定时器,来进行两条流的连接以及等待

    • 如果对方流中有我流的数据,那么直接输出成功;如果没有则更新我流状态,注册定时器等待另一个流

    • 然后用ontimer()触发定时器:判断条件如果两条流中还有状态没被清空,说明没匹配上

  1. 上代码
  • 代码
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //来自app的支付日志
        SingleOutputStreamOperator>appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L),
                Tuple3.of("order-3", "app", 3500L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
                    public long extractTimestamp(Tuple3element, long recordTimestamp) {return element.f2;
                    }
                })
        );
        //来自第三方平台的支付日志
        SingleOutputStreamOperator>thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.
     >forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
                    public long extractTimestamp(Tuple4element, long recordTimestamp) {return element.f3;
                    }
                })
        );

        //检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警
//        //这种也可以
//        appStream.keyBy(data->data.f0)
//                .connect(thirdpartStream.keyBy(data ->data.f0));
//
        appStream.connect(thirdpartStream)
                        .keyBy(data->data.f0,data->data.f0)
                        .process(new OrderMatchResult())
                        .print();


        env.execute();
    }
    //自定义实现CoFunction
    public static class OrderMatchResult extends CoProcessFunction,
                                                Tuple4,String>{//定义状态变量,用来保存已经到达的事件
        private ValueState>appEventState;
        private ValueState>thirdPartyEventState;

        //运行上下文环境中获取状态
        @Override
        public void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
            );
        thirdPartyEventState = getRuntimeContext().getState(
                new ValueStateDescriptor>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
        );
        }

        @Override
        public void processElement1(Tuple3value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//来的是app event,看另一条流中事件是否来过
            if(thirdPartyEventState.value()!=null){out.collect("对账成功:"+value+" "+thirdPartyEventState.value());
                //清空状态
                thirdPartyEventState.clear();
            }else{//如果没来就等待,并且更新状态
                appEventState.update(value);
                //注册一个5秒后的定时器,开始等待另一条的事件
                ctx.timerService().registerEventTimeTimer(value.f2+5000L);
            }

        }

        @Override
        public void processElement2(Tuple4value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//来的是app event,看另一条流中事件是否来过
            if(appEventState.value()!=null){out.collect("对账成功:"+appEventState.value()+" "+value);
                //清空状态
                appEventState.clear();
            }else{//如果没来就等待,并且更新状态
                thirdPartyEventState.update(value);
                //注册一个5秒后的定时器,开始等待另一条的事件
                ctx.timerService().registerEventTimeTimer(value.f3);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collectorout) throws Exception {//定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来
            //并且不会存在两个都不为空,因为其中一个不为空后会被清除
            //没有没清空表示失败
            if(appEventState.value()!=null){out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
            }
            if(thirdPartyEventState.value()!=null){out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
            }
            //清空所有数据
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }

}
  • 结果
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账成功:(order-3,app,3500) (order-3,third-party,success,4000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网站栏目:Flink-使用合流操作进行实时对账需求的实现-创新互联
网站链接:http://csdahua.cn/article/ihjpe.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流