09、Flink实战:DataStream之功能更丰富的Transformation算子RichMapFunction

Flink还提供了功能更丰富的Transformation实现接口。

RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.HashMap;

public class RichMapFunctionReview {
    public static void main(String[] args) throws Exception{
        //1. 创建流计算运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2. 从Socket读取数据做为DataStream
        DataStreamSource<String> lines = env.socketTextStream("192.168.***.***", 8888);

        //3. Transformatioin转换
        //3.1 flatMap将一行字符串进行切分并压平
        SingleOutputStreamOperator<Tuple2<String, Integer>> telNums = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] telNum = s.split(",");
                for (int i = 0; i < telNum.length; i++) {
                    collector.collect(Tuple2.of(telNum[i],1));
                }
            }
        });

        //3.2 对flatMap之后的数据进行过滤
        SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = telNums.filter(new FilterFunction<Tuple2<String, Integer>>() {
            @Override
            public boolean filter(Tuple2<String, Integer> tp2) throws Exception {
                if (tp2.f0.startsWith("0")) {
                    return true;
                }
                return false;    //过滤掉不是以0开头的
            }
        });

        //3.3 对聚合后的结果进行map操作,关联出归属地名称
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> glgsd = filtered.map(new RichMapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>>() {
            transient HashMap<String,String> hmgsd;

            // open()方法,在构造方法之后,map方法之前执行一次,Configuration可以拿到全局配置。
            // 大部分情况用来初始化链接,或者初始化或恢复state
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                hmgsd = new HashMap<>();
                hmgsd.put("010","北京");
                hmgsd.put("0571","杭州市");
                hmgsd.put("0551","合肥市");
                hmgsd.put("0991","乌鲁木齐");
                hmgsd.put("0351","太原市");
            }

            @Override
            public Tuple3<String, String, Integer> map(Tuple2<String,Integer> tp2) throws Exception {
                String gsdName = null;
                // 获取号码归属地
                for(String key:hmgsd.keySet()){
                    if(tp2.f0.startsWith(key)){
                        gsdName = hmgsd.get(key);
                    }
                }
                if(gsdName == null){
                    gsdName = "未知";
                }
                return Tuple3.of(tp2.f0,gsdName,tp2.f1);
            }

            // 销毁之前执行一次,通常是做资源的释放
            @Override
            public void close() throws Exception {
                super.close();
            }
        });

        //3.4 对过滤后的数据进行分组统计
        SingleOutputStreamOperator<Tuple3<String,String, Integer>> sumed = glgsd.keyBy(1).sum(2);

        //4 sink输出结果
        sumed.print();

        //5. 执行程序
        env.execute("RichMapFunctionReview");

    }
}

输入测试数据如下:

*

输出结果:

*

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: