说明

这篇文章只是让你入门接触到流计算的强大。

1
http://www.mirrorservice.org/sites/ftp.apache.org/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz

这里以windows为例子,解压后双击start-cluster.bat启动集群模式

alt

正常情况下,现在访问http://localhost:8081就能进入管理界面
alt

好,写下来我们编写我们的代码,我们可以从官方的工程开始,
生成Java项目的工程有两种方式,一种是maven生成

1
2
3
4
5
6
7
8
9
mvn archetype:generate								\
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=${1:-1.8.0} \
-DgroupId=org.myorg.quickstart \
-DartifactId=$PACKAGE \
-Dversion=0.1 \
-Dpackage=org.myorg.quickstart \
-DinteractiveMode=false

还有一种是使用官方提供的脚本:

1
curl https://flink.apache.org/q/quickstart.sh | bash

推荐使用第二种,方面快捷,官方负责维护,当然你执行的时候要看看自己flink的版本,因为脚本生成的工程pom.xml中flink的依赖会是最新的,可能不兼容你下载的flink版本。

我们把生成的工程导入IDEA,她的工程目录如下:
alt

好了我们选择StreamingJob来编写我们的代码,实现监控维基百科词条的更新,不过在此之前我们需要引入维基百科的详情资源。

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

然一顿骚操作之后编写好我们的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class StreamingJob {

public static void main(String[] args) throws Exception {
// 环境信息
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamSource<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());

//以用户名为key分组
KeyedStream<WikipediaEditEvent, String> keyedStream = edits.keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser);

//时间窗口为5秒
SingleOutputStreamOperator<Tuple3<String, Integer, StringBuilder>> operator = keyedStream.timeWindow(Time.seconds(15)).aggregate(new AggregateFunction<WikipediaEditEvent, Tuple3<String, Integer, StringBuilder>, Tuple3<String, Integer, StringBuilder>>() {
@Override
public Tuple3<String, Integer, StringBuilder> createAccumulator() {
//创建ACC
return new Tuple3<>("", 0, new StringBuilder());
}

@Override
public Tuple3<String, Integer, StringBuilder> add(WikipediaEditEvent wikipediaEditEvent, Tuple3<String, Integer, StringBuilder> stringIntegerStringBuilderTuple3) {
StringBuilder sb = stringIntegerStringBuilderTuple3.f2;

//如果是第一条记录,就加个"修改字符数量 :"作为前缀,
//如果不是第一条记录,就用空格作为分隔符
if(StringUtils.isBlank(sb.toString())){
sb.append("修改字符数量 : ");
}else {
sb.append(" ");
}

return new Tuple3<>(wikipediaEditEvent.getUser(),
wikipediaEditEvent.getByteDiff() + stringIntegerStringBuilderTuple3.f1,
sb.append(wikipediaEditEvent.getByteDiff()));
}

@Override
public Tuple3<String, Integer, StringBuilder> getResult(Tuple3<String, Integer, StringBuilder> stringIntegerStringBuilderTuple3) {
return stringIntegerStringBuilderTuple3;
}

@Override
public Tuple3<String, Integer, StringBuilder> merge(Tuple3<String, Integer, StringBuilder> stringIntegerStringBuilderTuple3, Tuple3<String, Integer, StringBuilder> acc1) {
//合并窗口的场景才会用到
return new Tuple3<>(stringIntegerStringBuilderTuple3.f0,
stringIntegerStringBuilderTuple3.f1 + acc1.f1, stringIntegerStringBuilderTuple3.f2.append(acc1.f2));
}
});

// 将每个key的聚合结果单独转为字符串,实际应用中这里可以发送到kafka、mysql或者redis中
operator.map((MapFunction<Tuple3<String, Integer, StringBuilder>, String>) Tuple3::toString).print();

// 执行
env.execute("Flink Streaming Java API Skeleton Start");
}
}

然后我们使用maven的命令打包

1
mvn clean package -U

打包好之后回到我们Flink的管理界面上传jar包

alt

填写类路径,然后提交
alt

到这我们已经看到程序已经开始执行了
alt

然后打开我们启动start-cluster.bat是弹出的控制台窗口,是不是发现程序已经完美运行了呢
alt

具体的API使用请查看官方文档。