Flink还提供了功能更丰富的Transformation实现接口。
RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息。
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.HashMap;
public class RichMapFunctionReview {
public static void main(String[] args) throws Exception{
//1. 创建流计算运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 从Socket读取数据做为DataStream
DataStreamSource<String> lines = env.socketTextStream("192.168.***.***", 8888);
//3. Transformatioin转换
//3.1 flatMap将一行字符串进行切分并压平
SingleOutputStreamOperator<Tuple2<String, Integer>> telNums = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] telNum = s.split(",");
for (int i = 0; i < telNum.length; i++) {
collector.collect(Tuple2.of(telNum[i],1));
}
}
});
//3.2 对flatMap之后的数据进行过滤
SingleOutputStreamOperator<Tuple2<String, Integer>> filtered = telNums.filter(new FilterFunction<Tuple2<String, Integer>>() {
@Override
public boolean filter(Tuple2<String, Integer> tp2) throws Exception {
if (tp2.f0.startsWith("0")) {
return true;
}
return false; //过滤掉不是以0开头的
}
});
//3.3 对聚合后的结果进行map操作,关联出归属地名称
SingleOutputStreamOperator<Tuple3<String, String, Integer>> glgsd = filtered.map(new RichMapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>>() {
transient HashMap<String,String> hmgsd;
// open()方法,在构造方法之后,map方法之前执行一次,Configuration可以拿到全局配置。
// 大部分情况用来初始化链接,或者初始化或恢复state
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
hmgsd = new HashMap<>();
hmgsd.put("010","北京");
hmgsd.put("0571","杭州市");
hmgsd.put("0551","合肥市");
hmgsd.put("0991","乌鲁木齐");
hmgsd.put("0351","太原市");
}
@Override
public Tuple3<String, String, Integer> map(Tuple2<String,Integer> tp2) throws Exception {
String gsdName = null;
// 获取号码归属地
for(String key:hmgsd.keySet()){
if(tp2.f0.startsWith(key)){
gsdName = hmgsd.get(key);
}
}
if(gsdName == null){
gsdName = "未知";
}
return Tuple3.of(tp2.f0,gsdName,tp2.f1);
}
// 销毁之前执行一次,通常是做资源的释放
@Override
public void close() throws Exception {
super.close();
}
});
//3.4 对过滤后的数据进行分组统计
SingleOutputStreamOperator<Tuple3<String,String, Integer>> sumed = glgsd.keyBy(1).sum(2);
//4 sink输出结果
sumed.print();
//5. 执行程序
env.execute("RichMapFunctionReview");
}
}
输入测试数据如下:
输出结果:
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: