使用Storm实现实时大数据分析!
2013-01-12 19:10:27 来源:互联网 评论:0 点击:
随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals(">"))
- {
- if(valueToCheck > Double.parseDouble(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals("=="))
- {
- if(valueToCheck == Double.parseDouble(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals("!="))
- {
- . . .
- }
- }
- }
- else
- splitAndEmit(null,collector);
- }
- else
- {
- System.err.println("Emitting null in bolt");
- splitAndEmit(null,collector);
- }
- }
经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。
DBWriterBolt
经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。
Listing Six:建表编码。
-
public void prepare( Map StormConf, TopologyContext context )
- {
- try
- {
- Class.forName(dbClass);
- }
- catch (ClassNotFoundException e)
- {
- System.out.println("Driver not found");
- e.printStackTrace();
- }
- try
- {
- connection driverManager.getConnection(
- "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);
- connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();
- StringBuilder createQuery = new StringBuilder(
- "CREATE TABLE IF NOT EXISTS "+tableName+"(");
- for(Field fields : tupleInfo.getFieldList())
- {
- if(fields.getColumnType().equalsIgnoreCase("String"))
- createQuery.append(fields.getColumnName()+" VARCHAR(500),");
- else
- createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");
- }
- createQuery.append("thresholdTimeStamp timestamp)");
- connection.prepareStatement(createQuery.toString()).execute();
- // Insert Query
- StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");
- String tempCreateQuery = new String();
- for(Field fields
上一篇:网站统计中的数据收集原理及实现(二)
下一篇:大数据定义和十大应用案例
分享到:
收藏
评论排行
- ·Windows(Win7)下用Xming...(92)
- ·使用jmx client监控activemq(20)
- ·Hive查询OOM分析(14)
- ·复杂网络架构导致的诡异...(8)
- ·使用 OpenStack 实现云...(7)
- ·影响Java EE性能的十大问题(6)
- ·云计算平台管理的三大利...(6)
- ·Mysql数据库复制延时分析(5)
- ·OpenStack Nova开发与测...(4)
- ·LTPP一键安装包1.2 发布(4)
- ·Linux下系统或服务排障的...(4)
- ·PHP发布5.4.4 和 5.3.1...(4)
- ·RSYSLOG搭建集中日志管理服务(4)
- ·转换程序源码的编码格式[...(3)
- ·Linux 的木马程式 Wirenet 出现(3)
- ·Nginx 发布1.2.1稳定版...(3)
- ·zend framework文件读取漏洞分析(3)
- ·Percona Playback 0.3 development release(3)
- ·运维业务与CMDB集成关系一例(3)
- ·应该知道的Linux技巧(3)