说明
Storm是Twitter开源的分布式实时大数据处理框架,随着越来越多的场景对Hadoop的MapReduce高延迟无法容忍,比如网站统计、推荐系统、预警系统、金融系统(高频交易、股票)等等,大数据实时处理解决方案(流计算)的应用日趋广泛,目前已是分布式技术领域最新爆发点,而Storm更是流计算技术中的佼佼者和主流。
storm的数据执行流程如下如
我们通过自定义Spout和Bolt的流程来控制数据的传输和处理。
此处使用使用kafka作为输入数据源,通过定时任务不断地产生模拟数据,storm获取kafka的数据处理,处理完毕之后吧数据存储在Redis中。
依赖
因为storm-core和spring-boot会有slf4j日志的冲突,所以我们需要排除大量的依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| <properties> <java.version>1.8</java.version> <fastjson.version>1.2.59</fastjson.version> <log4j-to-slf4j.version>2.0.2</log4j-to-slf4j.version> <storm.version>1.1.1</storm.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <artifactId>log4j-over-slf4j</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>logback-classic</artifactId> <groupId>ch.qos.logback</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${storm.version}</version> </dependency>
<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>${log4j-to-slf4j.version}</version> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> </exclusions> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
打包方式也与普通spring boot工程有点不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <encoding>UTF-8</encoding> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass>com.yi.readboard.ReadBoardApplication</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.1.7.RELEASE</version> </dependency> </dependencies> <configuration> <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope> <createDependencyReducedPom>true</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <artifactSet> <excludes> <exclude>org.slf4j:slf4j-api</exclude> <exclude>javax.mail:javax.mail-api</exclude> <exclude>org.apache.storm:storm-core</exclude> <exclude>org.apache.storm:storm-kafka</exclude> <exclude>org.apache.logging.log4j:log4j-slf4j-impl</exclude> </excludes> </artifactSet> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"> <resource>META-INF/spring.factories</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.yi.readboard.ReadBoardApplication</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
|
配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=node01:9092,node02:9092,node03:9092
# REDIS (RedisProperties) # Redis数据库索引(默认为0) spring.redis.database=0 # Redis服务器地址 spring.redis.host=192.168.80.100 # Redis服务器连接端口 spring.redis.port=6379
# storm 使用配置 # storm名称 为空是本地运行模式,填写是集群运行模式 custom.storm.kafka.name=CustomStorm custom.storm.kafka.bootstrapServers=node01:9092,node02:9092,node03:9092 custom.storm.kafka.topics[0]=order
|
代码说明
只要是两个重点
1、在启动类中因为我们不能使用spring boot内置的Tomcat,所以必须排除掉
1 2 3 4 5 6 7 8 9 10 11
| @SpringBootApplication public class SpringBootStormApplication {
public static void main(String[] args) { SpringApplication app = new SpringApplication(SpringBootStormApplication.class); app.setWebApplicationType(WebApplicationType.NONE); app.run(args); }
}
|
2、在继承BaseBasicBolt中使用自动注入可能会导致注入不成功的问题,所以我们可以使用一个单独的配置类来注入
1 2 3 4 5 6 7 8 9 10
| @Configuration @EnableConfigurationProperties(RedisProperties.class) public class RedisConfig { public static StringRedisTemplate stringRedisTemplate;
@Autowired public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) { RedisConfig.stringRedisTemplate = stringRedisTemplate; } }
|
然后在继承BaseBasicBolt的类中重写父类的初始化方法对需要注入的类进行初始化,当然这不是必须的,你也可以直接使用静态调用直接使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| /** * 统计面板数据 * * @author huangwenyi * @date 2019-9-2 */ @Service public class CountMoneyBolt extends BaseBasicBolt { private StringRedisTemplate stringRedisTemplate;
/** * 初始化方法,程序启动只会调用一次 * * @param stormConf worker的Storm配置 * @param context 上下文 */ @Override public void prepare(Map stormConf, TopologyContext context) { this.stringRedisTemplate = RedisConfig.stringRedisTemplate; }
/** * 接收我们上游kafkaspout发送过来的数据,然后将数据保存到redis当中去 * * @param input * @param collector */ @Override public void execute(Tuple input, BasicOutputCollector collector) { // 获取数据 Object value = input.getValue(4); if (null != value && !StringUtils.isEmpty(value.toString())) { String jsonStr = value.toString(); PaymentInfo paymentInfo = JSON.parseObject(jsonStr, PaymentInfo.class);
// 平台销售总额 stringRedisTemplate.opsForValue().increment("order:total:price:date", paymentInfo.getPayPrice()); //平台今天下单的人数 stringRedisTemplate.opsForValue().increment("order:total:user:date"); //平台销售的商品数量 stringRedisTemplate.opsForValue().increment("order:num:user:date"); //每个商品的总销售额 stringRedisTemplate.opsForValue().increment("order:" + paymentInfo.getProductId() + ":price:date", paymentInfo.getPayPrice()); //统计每个商品的购买人数 stringRedisTemplate.opsForValue().increment("order:" + paymentInfo.getProductId() + ":user:date"); // 每个商品的销售数量 stringRedisTemplate.opsForValue().increment("order:" + paymentInfo.getProductId() + ":num:date"); //店铺的销售总额 stringRedisTemplate.opsForValue().increment("order:" + paymentInfo.getShopId() + ":price:date", paymentInfo.getPayPrice()); //店铺的购买人数 stringRedisTemplate.opsForValue().increment("order:" + paymentInfo.getShopId() + ":user:date"); //每个店铺的销售数量 stringRedisTemplate.opsForValue().increment("order:" + paymentInfo.getShopId() + ":num:date"); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {
} }
|
运行
本地模式很简单把
1
| custom.storm.kafka.name=CustomStorm
|
不赋值就行
1
| custom.storm.kafka.name=
|
然后就可以在idea中启动了
集群模式复杂一点
1
| custom.storm.kafka.name=CustomStorm
|
这个配置一定不能为空,随便起个名字都可以
然后把jar包上传到我们安装了storm集群的任意一台服务器
然后切换到我们的storm的安装目录执行下面启动命令
1
| bin/storm jar /export/servers/read-board-0.0.1-SNAPSHOT.jar com.yi.readboard.ReadBoardApplication
|
命令解释:bin/storm jar jar包全路径 main方法所在的包名+类名
执行成功之后可以在Storm UI中查看我们的Topology
好了很简单,尽情的发挥你的创意吧
代码:https://github.com/HWYWL/spring-boot-2.x-examples/tree/master/spring-boot-storm
问题建议