使用Storm实现实时大数据分析!
2013-01-12 19:10:27 来源:互联网 评论:0 点击:
随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译
- for(Field fields : tupleInfo.getFieldList())
- {
- insertQuery.append(fields.getColumnName()+",");
- }
- insertQuery.append("thresholdTimeStamp").append(") values (");
- for(Field fields : tupleInfo.getFieldList())
- {
- insertQuery.append("?,");
- }
- insertQuery.append("?)");
- prepStatement = connection.prepareStatement(insertQuery.toString());
- }
- catch (SQLException e)
- {
- e.printStackTrace();
- }
- }
数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。
Listing Seven:数据插入的代码部分。
-
public void execute(Tuple tuple, BasicOutputCollector collector)
- {
- batchExecuted=false;
- if(tuple!=null)
- {
- List<Object> inputTupleList = (List<Object>) tuple.getValues();
- int dbIndex=0;
- for(int i=0;i<tupleInfo.getFieldList().size();i++)
- {
- Field field = tupleInfo.getFieldList().get(i);
- try {
- dbIndex = i+1;
- if(field.getColumnType().equalsIgnoreCase("String"))
- prepStatement.setString(dbIndex, inputTupleList.get(i).toString());
- else if(field.getColumnType().equalsIgnoreCase("int"))
- prepStatement.setInt(dbIndex,
- Integer.parseInt(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("long"))
- prepStatement.setLong(dbIndex,
- Long.parseLong(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("float"))
- prepStatement.setFloat(dbIndex,
- Float.parseFloat(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("double"))
- prepStatement.setDouble(dbIndex,
- Double.parseDouble(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("short"))
- prepStatement.setShort(dbIndex,
- Short.parseShort(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("boolean"))
- prepStatement.setBoolean(dbIndex,
- Boolean.parseBoolean(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("byte"))
- prepStatement.setByte(dbIndex,
- Byte.parseByte(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("Date"))
- {
- Date dateToAdd=null;
- if (!(inputTupleList.get(i) instanceof Date))
- {
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
- try
- {
- dateToAdd = df.parse(inputTupleList.get(i).toString());
- }
- catch (ParseException e)
- {
- System.err.println("Data type not valid");
- }
- }
- else
- {
- dateToAdd = (Date)inputTupleList.get(i);
- java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());
- prepStatement.setDate(dbIndex, sqlDate);
- }
上一篇:网站统计中的数据收集原理及实现(二)
下一篇:大数据定义和十大应用案例
分享到:
收藏
评论排行
- ·Windows(Win7)下用Xming...(92)
- ·使用jmx client监控activemq(20)
- ·Hive查询OOM分析(14)
- ·复杂网络架构导致的诡异...(8)
- ·使用 OpenStack 实现云...(7)
- ·影响Java EE性能的十大问题(6)
- ·云计算平台管理的三大利...(6)
- ·Mysql数据库复制延时分析(5)
- ·OpenStack Nova开发与测...(4)
- ·LTPP一键安装包1.2 发布(4)
- ·Linux下系统或服务排障的...(4)
- ·PHP发布5.4.4 和 5.3.1...(4)
- ·RSYSLOG搭建集中日志管理服务(4)
- ·转换程序源码的编码格式[...(3)
- ·Linux 的木马程式 Wirenet 出现(3)
- ·Nginx 发布1.2.1稳定版...(3)
- ·zend framework文件读取漏洞分析(3)
- ·Percona Playback 0.3 development release(3)
- ·运维业务与CMDB集成关系一例(3)
- ·应该知道的Linux技巧(3)