使用Storm实现实时大数据分析!
2013-01-12 19:10:27 来源:互联网 评论:0 点击:
随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
- {
- fileReader = new BufferedReader(new FileReader(new File(file)));
- }
- catch (FileNotFoundException e)
- {
- System.exit(1);
- }
- }
- public void nextTuple()
- {
- protected void ListenFile(File file)
- {
- Utils.sleep(2000);
- RandomAccessFile access = null;
- String line = null;
- try
- {
- while ((line = access.readLine()) != null)
- {
- if (line !=null)
- {
- String[] fields=null;
- if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter());
- else
- fields = line.split (tupleInfo.getDelimiter());
- if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields));
- }
- }
- }
- catch (IOException ex){ }
- }
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer)
- {
- String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
- for(int i=0; i<tupleInfo.getFieldList().size(); i++)
- {
- fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();
- }
- declarer.declare(new Fields(fieldsArr));
- }
declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。
Bolt的实现
Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。
Figure 3:Spout到Bolt的数据流程。
ThresholdCalculatorBolt
Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:
临界值检查
- 临界值栏数检查(拆分成字段的数目)
- 临界值数据类型(拆分后字段的类型)
- 临界值出现的频数
- 临界值时间段检查
Listing Four中的类,定义用来保存这些值。
Listing Four:ThresholdInfo类
-
public class ThresholdInfo implementsSerializable
- {
- private String action;
- private String rule;
- private Object thresholdValue;
- private int thresholdColNumber;
- private Integer timeWindow;
- private int frequencyOfOccurence;
- }
基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。
Listing Five:临界值检测代码段
-
public void execute(Tuple tuple, BasicOutputCollector collector)
- {
- if(tuple!=null)
- {
- List<Object> inputTupleList = (List<Object>) tuple.getValues();
- int thresholdColNum = thresholdInfo.getThresholdColNumber();
- Object thresholdValue = thresholdInfo.getThresholdValue();
- String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();
- Integer timeWindow = thresholdInfo.getTimeWindow();
- int frequency = thresholdInfo.getFrequencyOfOccurence();
- if(thresholdDataType.equalsIgnoreCase("string"))
- {
- String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();
- String frequencyChkOp = thresholdInfo.getAction();
- if(timeWindow!=null)
- {
- long curTime = System.currentTimeMillis();
- long diffInMinutes = (curTime-startTime)/(1000);
- if(diffInMinutes>=timeWindow)
- {
- if(frequencyChkOp.equals("=="))
- {
- if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
上一篇:网站统计中的数据收集原理及实现(二)
下一篇:大数据定义和十大应用案例
分享到:
收藏
评论排行
- ·Windows(Win7)下用Xming...(92)
- ·使用jmx client监控activemq(20)
- ·Hive查询OOM分析(14)
- ·复杂网络架构导致的诡异...(8)
- ·使用 OpenStack 实现云...(7)
- ·影响Java EE性能的十大问题(6)
- ·云计算平台管理的三大利...(6)
- ·Mysql数据库复制延时分析(5)
- ·OpenStack Nova开发与测...(4)
- ·LTPP一键安装包1.2 发布(4)
- ·Linux下系统或服务排障的...(4)
- ·PHP发布5.4.4 和 5.3.1...(4)
- ·RSYSLOG搭建集中日志管理服务(4)
- ·转换程序源码的编码格式[...(3)
- ·Linux 的木马程式 Wirenet 出现(3)
- ·Nginx 发布1.2.1稳定版...(3)
- ·zend framework文件读取漏洞分析(3)
- ·Percona Playback 0.3 development release(3)
- ·运维业务与CMDB集成关系一例(3)
- ·应该知道的Linux技巧(3)