美文网首页
hadoop实战-8.stormProject Idea、mav

hadoop实战-8.stormProject Idea、mav

作者: 笨鸡 | 来源:发表于2019-04-01 14:39 被阅读0次

1.创建maven项目

添加storm依赖 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ctgu.ct</groupId>
    <artifactId>storm_t</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.2.2</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>

2.创建Spout,Bolt

RandomWordSpout.java

package StormProject;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class RandomWordSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};


    @Override
    public void nextTuple() {

        Random random = new Random();

        int index = random.nextInt(words.length);

        String goodName = words[index];

        collector.emit(new Values(goodName));

        Utils.sleep(500);
    }


    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("orignname"));
    }
}

UpperBolt.java

package StormProject;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class UpperBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String goodName = tuple.getString(0);
        String goodName_upper = goodName.toUpperCase();
        basicOutputCollector.emit(new Values(goodName_upper));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("uppername"));
    }
}

SuffixBolt

package StormProject;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;

public class SuffixBolt extends BaseBasicBolt {

    FileWriter fileWriter = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        try{
            fileWriter = new FileWriter("/root/storm_dir/"+ UUID.randomUUID());
        }catch(IOException e){
            throw new RuntimeException(e);
        }
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String upper_name = tuple.getString(0);
        String suffix_name = upper_name + " IT IS OK";
        try{
            fileWriter.write(suffix_name);
            fileWriter.write("\n");
            fileWriter.flush();
        }catch(IOException e){
            throw new RuntimeException(e);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

TopoMain.java

package StormProject;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;

public class TopoMain {

    public static void main(String[] args) throws Exception{
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("randomspout", new RandomWordSpout(), 4);
        builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
        builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
        StormTopology topology = builder.createTopology();

        Config conf = new Config();
        conf.setNumWorkers(4);
        conf.setDebug(true);
        conf.setNumAckers(0);
        StormSubmitter.submitTopology("demotopo", conf, topology);
    }
}

3.打包上传并运行

storm_jar.png

浏览器

storm_run.png

tail -f filename


storm_result.png

喜欢的话,希望您动动小手点个赞支持下哦

相关文章

网友评论

      本文标题:hadoop实战-8.stormProject Idea、mav

      本文链接:https://www.haomeiwen.com/subject/brkvbqtx.html