说明
这篇文章只是让你入门接触到流计算的强大。
下载安装Apache Flink
1
| http://www.mirrorservice.org/sites/ftp.apache.org/flink/flink-1.8.0/flink-1.8.0-bin-scala_2.12.tgz
|
这里以windows为例子,解压后双击start-cluster.bat启动集群模式
正常情况下,现在访问http://localhost:8081就能进入管理界面
好,写下来我们编写我们的代码,我们可以从官方的工程开始,
生成Java项目的工程有两种方式,一种是maven生成
1 2 3 4 5 6 7 8 9
| mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=${1:-1.8.0} \ -DgroupId=org.myorg.quickstart \ -DartifactId=$PACKAGE \ -Dversion=0.1 \ -Dpackage=org.myorg.quickstart \ -DinteractiveMode=false
|
还有一种是使用官方提供的脚本:
1
| curl https://flink.apache.org/q/quickstart.sh | bash
|
推荐使用第二种,方面快捷,官方负责维护,当然你执行的时候要看看自己flink的版本,因为脚本生成的工程pom.xml中flink的依赖会是最新的,可能不兼容你下载的flink版本。
我们把生成的工程导入IDEA,她的工程目录如下:
好了我们选择StreamingJob来编写我们的代码,实现监控维基百科词条的更新,不过在此之前我们需要引入维基百科的详情资源。
1 2 3 4 5
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-wikiedits_2.11</artifactId> <version>${flink.version}</version> </dependency>
|
然一顿骚操作之后编写好我们的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class StreamingJob {
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
KeyedStream<WikipediaEditEvent, String> keyedStream = edits.keyBy((KeySelector<WikipediaEditEvent, String>) WikipediaEditEvent::getUser);
SingleOutputStreamOperator<Tuple3<String, Integer, StringBuilder>> operator = keyedStream.timeWindow(Time.seconds(15)).aggregate(new AggregateFunction<WikipediaEditEvent, Tuple3<String, Integer, StringBuilder>, Tuple3<String, Integer, StringBuilder>>() { @Override public Tuple3<String, Integer, StringBuilder> createAccumulator() { return new Tuple3<>("", 0, new StringBuilder()); }
@Override public Tuple3<String, Integer, StringBuilder> add(WikipediaEditEvent wikipediaEditEvent, Tuple3<String, Integer, StringBuilder> stringIntegerStringBuilderTuple3) { StringBuilder sb = stringIntegerStringBuilderTuple3.f2;
if(StringUtils.isBlank(sb.toString())){ sb.append("修改字符数量 : "); }else { sb.append(" "); }
return new Tuple3<>(wikipediaEditEvent.getUser(), wikipediaEditEvent.getByteDiff() + stringIntegerStringBuilderTuple3.f1, sb.append(wikipediaEditEvent.getByteDiff())); }
@Override public Tuple3<String, Integer, StringBuilder> getResult(Tuple3<String, Integer, StringBuilder> stringIntegerStringBuilderTuple3) { return stringIntegerStringBuilderTuple3; }
@Override public Tuple3<String, Integer, StringBuilder> merge(Tuple3<String, Integer, StringBuilder> stringIntegerStringBuilderTuple3, Tuple3<String, Integer, StringBuilder> acc1) { return new Tuple3<>(stringIntegerStringBuilderTuple3.f0, stringIntegerStringBuilderTuple3.f1 + acc1.f1, stringIntegerStringBuilderTuple3.f2.append(acc1.f2)); } });
operator.map((MapFunction<Tuple3<String, Integer, StringBuilder>, String>) Tuple3::toString).print();
env.execute("Flink Streaming Java API Skeleton Start"); } }
|
然后我们使用maven的命令打包
打包好之后回到我们Flink的管理界面上传jar包
填写类路径,然后提交
到这我们已经看到程序已经开始执行了
然后打开我们启动start-cluster.bat是弹出的控制台窗口,是不是发现程序已经完美运行了呢
具体的API使用请查看官方文档。