美文网首页
Storm从入门到精通11:Storm与JDBC集成

Storm从入门到精通11:Storm与JDBC集成

作者: 金字塔下的小蜗牛 | 来源:发表于2020-04-04 23:15 被阅读0次

1.导入Jar包

将Storm处理的结果存放到MySQL数据库中,需要依赖下面一些Jar包:

$STORM_HOME\external\sql\storm-sql-core*.jar
$STORM_HOME\external\storm-jdbc\storm-jdbc-1.0.3.jar
mysql-connector-java-5.1.7-bin.jar
commons-lang3-3.1.jar

2.示例

将Storm的计算结果存入MySQL:以Storm的WordCount程序为例

2.1创建Spout

创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

public class WordCountSpout extends BaseRichSpout{
    //模拟数据
    private String[] data = {"I love Beijing",
                             "I love China",
                              "Beijing is the capital of China"};
    //用于往下一个组件发送消息
    private SpoutOutputCollector collector;
    @Override
    public void nextTuple(){
        Utils.sleep(3000);
        //由Storm框架调用,用于接收外部数据源的数据
        int random = (new Random()).nextInt(3);
        String sentence = data[random];
        //System.out.println("发送数据:"+sentence);
        this.collector.emit(new Values(sentence));
    }
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
        //Spout初始化方法
        this.collector = collector;
    }    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("sentence"));
    }
}

2.2创建Bolt

创建Bolt(WordCountSplitBolt)组件进行分词操作

public class WordCountSplitBolt extends BaseRichBolt{
    private OutputCollector collector;
    @Override
    public void execute(Tuple tuple){
        String sentence = tuple.getStringByField("sentence");
        //分词
        String[] words = sentence.split(" ");
        for(String word:words){
            this.collector.emit(new Values(word,1));
        }
    }
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector){
        this.collector = collector;
    }    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word","count"));
    }
}

2.3创建Bolt

创建Bolt(WordCountBoltCount)组件进行单词计数操作

public class WordCountBoltCount extends BaseRichBolt{
    private Map<String, Integer> result = new HashMap<String, Integer>();
    private OutputCollector collector;
    @Override
    public void execute(Tuple tuple){
        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");
        if(result.containsKey(word)){
            int total = result.get(word);
            result.put(word,total+count);
        }else{
            result.put(word,1);
        }
        //输出结果到屏幕
        //System.out.println("输出的结果是:"+result);
        //将统计结果发送给下一个Bolt,即插入MySQL数据库
        this.collector.emit(new Values(word,result.get(word)));
    }
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector){
        this.collector = collector;
    } 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word","total"));
    }
}

2.4创建主程序Topology

创建主程序Topology(WordCountTopology)

public static class WordCountTopology{
    //创建JDBC Insert Bolt组件,需要实现在MySQL中创建对应的表:result
    private static IRichBolt createJDBCBolt(){
        ConnectionProvider connectionProvider = new MyConnectionProvider();
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("aaa",connectionProvider);
        return new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
                   .withTableName("result").withQueryTimeoutSecs(30);
    }
    public static void main(String[] args){
        TopologyBuilder builder = new TopologyBuilder();  
        //设置任务的Spout组件
        builder.setSpout("wordcount_spout",new WordCountSpout());
        //设置任务的第一个Bolt组件
        builder.setBolt("wordcount_splitbolt",new WordCountSplitBolt())
               .shuffleGrouping("wordcount_spout");
        //设置任务的第二个Bolt组件
        builder.setBolt("wordcount_count",new WordCountBoltCount())
               .filedsGrouping("wordcount_splitbolt",new Fields("word"));
        //创建Topology任务
        StormTopology wc = builder.createTopology();
        Config config = new Config();
        //提交到Storm集群运行
        StormSubmitter.submitTopology(args[0],config,wc);
    }
}

2.5实现ConnectionProvider接口

class MyConnectionProvider implements ConnectionProvider{
    private static String driver = "com.mysql.jdbc.Driver";
    private static String url = "jdbc:mysql://192.168.126.110:3306/demo";
    private static String user = "root";
    private static String password = "123456";
    static{
        try{
            Class.forName(dirver);
        }catch(ClassNotFoundException e){
            throw new ExecptionInInitializerError(e);
        }
    }
    @Override
    public Connection getConnection(){
        try{
            return DriverManager.getConnection(usl,user,password);
        }catch(SQLException e){
            e.printStackTrace();
        }
        return null;
    }
    public void cleanup(){}
    public void prepare(){}
}

相关文章

网友评论

      本文标题:Storm从入门到精通11:Storm与JDBC集成

      本文链接:https://www.haomeiwen.com/subject/adjkdhtx.html