扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
小编给大家分享一下storm集群WordCount的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
十年的山阴网站建设经验,针对设计、前端、开发、售后、文案、推广等六对一服务,响应快,48小时及时工作处理。成都全网营销推广的优势是能够根据用户设备显示端的尺寸不同,自动调整山阴建站的显示方式,使网站能够适用不同显示终端,在浏览器中调整网站的宽度,无论在任何一种浏览器上浏览网站,都能展现优雅布局与设计,从而大程度地提升浏览体验。创新互联从事“山阴网站设计”,“山阴网站推广”以来,每个客户项目都认真落实执行。
storm集群实例运行
storm本地运行只需要storm的jar包就可以了,结果可以在控制台直接看到,storm集群运行,结果要在log日志里看,或者存储下来。并且,集群运行,execute方法里的输出可以看到,但是cleanup里的输出是看不到的,因为cleanup只有在topology结束后才会执行,而storm是实时连续的运行的,所以输出放在execute里或者保存起来查看。
wordcount实例代码
代码在前面的博客里已经写了只是将WordCounter做了点修改
package com.storm.stormDemo; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; import com.storm.stormTest.MergeObjects; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt { public static Logger LOG = Logger.getLogger(WordCounter.class); Integer id; String name; Mapcounters; private OutputCollector collector; BufferedWriter output; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap (); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); try { output = new BufferedWriter(new FileWriter("/home/zhanghuan/Downloads/wordcount.txt" , true)); } catch (IOException e) { // TODO Auto-generated catch block try { output.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); } } public void execute(Tuple input) { String str = input.getString(0); if (!counters.containsKey(str)) { counters.put(str, 1); } else { Integer c = counters.get(str) + 1; counters.put(str, c); } Iterator iterator = counters.keySet().iterator(); while(iterator.hasNext()){ String next = iterator.next(); try { System.out.print(next + ":" + counters.get(next) + " "); output.write(next + ":" + counters.get(next) + " "); output.flush(); } catch (IOException e) { e.printStackTrace(); try { output.close(); } catch (IOException e1) { e1.printStackTrace(); } } } // 确认成功处理一个tuple collector.ack(input); } /** * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里 * 因为这只是个Demo,我们用它来打印我们的计数器 * */ public void cleanup() { LOG.info("-- Word Counter [" + name + "-" + id + "] --"); for (Map.Entry entry : counters.entrySet()) { LOG.info(entry.getKey() + ": " + entry.getValue()); } counters.clear(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub //declarer.declare(new Fields("word","number")); } public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
集群运行
storm jar StormDemo.jar com.storm.stormDemo.WordCountTopologyMain StormDemo /home/zhanghuan/Downloads/test.txt
注意:主函数路径要写全
如果集群报错如下:
则打开你打的第三方jar包文件夹,在里面找到storm-core-0.10.0.jar,删除这个jar包里的default.yarml文件,或则删掉你打的storm jar包。
topology提交后,会启动相应数量的worker进程和logwriter进程,ui界面上也能看到这个topology的运行
这时候你就可以查看log日志文件或者存储位置,查看结果。
没有数据输入的时候,日志就像最下方一样,保持通信。
停止topology运行
storm kill topology的名字
看完了这篇文章,相信你对“storm集群WordCount的示例分析”有了一定的了解,如果想了解更多相关知识,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流