文章目录
- 一.代码准备
- 二.运行程序
- 参考:
一.代码准备
org.flink.beans.SensorReading
package org.flink.beans;
/**
* @author 只是甲
* @date 2021-08-30
* @remark 传感器温度读数的数据类型
*/
public class SensorReading {
// 属性:id,时间戳,温度值
private String id;
private Long timestamp;
private Double temperature;
public SensorReading() {
}
public SensorReading(String id, Long timestamp, Double temperature) {
this.id = id;
this.timestamp = timestamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SensorReading{" +
"id='" + id + '\'' +
", timestamp=" + timestamp +
", temperature=" + temperature +
'}';
}
}
SourceTest4_UDF
package org.example;
import org.flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.HashMap;
import java.util.Random;
/**
* @author 只是甲
* @date 2021-08-31
* @remark Flink之自定义Source
*/
public class SourceTest4_UDF {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读取数据
DataStream<SensorReading> dataStream = env.addSource( new MySensorSource() );
// 打印输出
dataStream.print();
env.execute();
}
// 实现自定义的SourceFunction
public static class MySensorSource implements SourceFunction<SensorReading>{
// 定义一个标识位,用来控制数据的产生
private boolean running = true;
@Override
public void run(SourceContext<SensorReading> ctx) throws Exception {
// 定义一个随机数发生器
Random random = new Random();
// 设置10个传感器的初始温度
HashMap<String, Double> sensorTempMap = new HashMap<>();
for( int i = 0; i < 10; i++ ){
sensorTempMap.put("sensor_" + (i+1), 60 + random.nextGaussian() * 20);
}
while (running){
for( String sensorId: sensorTempMap.keySet() ){
// 在当前温度基础上随机波动
Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian();
sensorTempMap.put(sensorId, newtemp);
ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newtemp));
}
// 控制输出频率
Thread.sleep(2000L);
}
}
@Override
public void cancel() {
running = false;
}
}}
二.运行程序
运行程序截图:
参考:
1、 https://www.bilibili.com/video/BV1qy4y1q728;
2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_521-%e4%bb%8e%e9%9b%86%e5%90%88%e8%af%bb%e5%8f%96%e6%95%b0%e6%8d%ae
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: