博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm入门(四)WordCount示例
阅读量:6161 次
发布时间:2019-06-21

本文共 7347 字,大约阅读时间需要 24 分钟。

一、关联代码

使用maven,代码如下。

pom.xml  和相同

RandomSentenceSpout.java

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package cn.ljh.storm.wordcount; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); SpoutOutputCollector _collector; Random _rand; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } protected String sentence(String input) { return input; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } } }

WordCountTopology.java

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package cn.ljh.storm.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class WordCountTopology { public static class SplitSentence implements IRichBolt { private OutputCollector _collector; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public Map
getComponentConfiguration() { return null; } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple input) { String sentence = input.getStringByField("word"); String[] words = sentence.split(" "); for(String word : words){ this._collector.emit(new Values(word)); } } public void cleanup() { // TODO Auto-generated method stub } } public static class WordCount extends BaseBasicBolt { Map
counts = new HashMap
(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordReport extends BaseBasicBolt { Map
counts = new HashMap
(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getStringByField("word"); Integer count = tuple.getIntegerByField("count"); this.counts.put(word, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { System.out.println("-----------------FINAL COUNTS START-----------------------"); List
keys = new ArrayList
(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key : keys){ System.out.println(key + " : " + this.counts.get(key)); } System.out.println("-----------------FINAL COUNTS END-----------------------"); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); //ShuffleGrouping:随机选择一个Task来发送。 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); //FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); //GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。 builder.setBolt("report", new WordReport(), 6).globalGrouping("count"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(20000); cluster.shutdown(); } } }

 

二、执行效果

 
 

转载于:https://www.cnblogs.com/liuys635/p/10786476.html

你可能感兴趣的文章
Log4J日志配置详解
查看>>
实验7 BindService模拟通信
查看>>
scanf
查看>>
Socket编程注意接收缓冲区大小
查看>>
SpringMVC初写(五)拦截器
查看>>
检测oracle数据库坏块的方法
查看>>
SQL server 安装教程
查看>>
Linux下ftp和ssh详解
查看>>
跨站脚本功攻击,xss,一个简单的例子让你知道什么是xss攻击
查看>>
js时间和时间戳之间如何转换(汇总)
查看>>
js插件---图片懒加载echo.js结合 Amaze UI ScrollSpy 使用
查看>>
java中string和int的相互转换
查看>>
P1666 前缀单词
查看>>
HTML.2文本
查看>>
Ubuntu unity安装Indicator-Multiload
查看>>
解决Eclipse中新建jsp文件ISO8859-1 编码问题
查看>>
7.对象创建型模式-总结
查看>>
【论文阅读】Classification of breast cancer histology images using transfer learning
查看>>
移动端处理图片懒加载
查看>>
jQuery.on() 函数详解
查看>>