Kafka初识

作者: 033a1d1f0c58 | 来源:发表于2016-07-24 18:16 被阅读174次

问题一

写出增加Kafka的Partition命令

bin/kafka-add-partitions.sh --topic test --partition 2  --zookeeper  192.168.197.170:2181,192.168.197.171:2181

问题二

列出配置Kafka删除日志的配置参数

参数 说明(解释)
log.roll.hours =24*7 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleaner.io.buffer.size=512*1024 日志清理时候用到的IO块大小一般不需要修改
log.cleaner.io.buffer.load.factor =0.9 日志清理中hash表的扩大因子一般不需要修改
log.cleaner.backoff.ms=15000 检查是否处罚日志清理的间隔
log.cleaner.min.cleanable.ratio=0.5 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖

问题三

Kafka中有一个topic: tfc_pass,实时接收过车记录,字段为:号牌(hphm)、过车时间(gcsj)、速度(sd);inceptor中有一张黑名单的表,包括字段号牌号码(hphm);请编写一个StreamJob,实时将属于黑名单表的过车记录写到结果表result中。

Producer

public classClusterProducer extends Thread {
  private static final Log log =LogFactory.getLog(ClusterProducer.class);
 
  public void sendData() {
    Random rnd = new Random();
    Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
    if (props == null) {
      log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
       return;
    }
    //set the producer configurationproperties
    ProducerConfig config = newProducerConfig(props);
    Producer<String, String> producer= new Producer<String, String>(config);
 
    //Send the data
    KeyedMessage<String, String> data;
    int count = 0;
    String url = "jdbc:inceptor://127.0.0.1:1433;DatabaseName=tmp";  
    String username = "sa";  
    String password = "";  
  
    Connection conn = DriverManager.getConnection(url, username, password);  
    Statement stmt = conn.createStatement();  
    ResultSet rs = stmt.executeQuery("select hphm from tfc_pass");  
    while (rs.next()) {  
    data = new KeyedMessage<String,String>("tfc_pass", count+'', rs.getString("hphm") );
    producer.send(data);
    count++;
    } 
    
    stmt.close();  
    conn.close();  
    producer.close();
  }
 
  public void run() {
    sendData();
  }
 
  public static void main(String[] args) {
    new ClusterProducer().sendData();
  }
}

Consumer

public class Consumerextends Thread {
  private static final Log log =LogFactory.getLog(Consumer.class);
  private final ConsumerConnector consumer;
  private final String topic;
 
  public Consumer(String topic) {
    consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
        createConsumerConfig());
    this.topic = topic;
  }
 
  private static ConsumerConfigcreateConsumerConfig() {
    Properties props = new Properties();
     props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id",KafkaProperties.groupId);
     props.put("zookeeper.session.timeout.ms", "400");
     props.put("zookeeper.sync.time.ms", "200");
     props.put("auto.commit.interval.ms", "1000");
 
    return new ConsumerConfig(props);
 
  }
 
  public void run() {
    String url = "jdbc:inceptor://127.0.0.1:1433;DatabaseName=tmp";  
    String username = "sa";  
    String password = ""; 
    Connection conn = DriverManager.getConnection(url, username, password);  
    Statement stmt = conn.createStatement();  
    
    Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, newInteger(1));
    Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]>it = stream.iterator();
    while (it.hasNext()) {
      String val = new String(it.next().message());
      stmt.executeUpdate("insert into blacklist values('"+val+"')") ;
    }
    
    stmt.close();  
    conn.close();  
  }
 
  public static void main(String[] args) {
    Consumer client = new Consumer("cluster_statistics_topic");
    client.run();
  }
}

相关文章

  • Kafka核心API——Stream API

    Kafka Stream概念及初识高层架构图 Kafka Stream是Apache Kafka从0.10版本引入...

  • 入门 Kafka必看

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区...

  • 真的,Kafka 入门一篇文章就够了

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区...

  • 初识kafka

    大部分内容是翻译至Kafka The Definitive Guide,有翻译不好的地方请见谅,因为理解问题翻译错...

  • 初识kafka

    什么是kafka? 官网的解释是,Kafka是一个分布式的流计算的平台。它提供了三个主要的功能:1、实现了发布和订...

  • 初识Kafka

    首先访问Kafka的官网 http://kafka.apache.org 点击左下角的Download 我们下载 ...

  • 初识Kafka

    开篇 在微服务的架构设计中我们一般都会考虑服务之间互相调用的问题,如何做到更好的解耦设计。在秒杀的系统中会使用异步...

  • Kafka初识

    问题一 写出增加Kafka的Partition命令 问题二 列出配置Kafka删除日志的配置参数 问题三 Kafk...

  • 初识Kafka

    Kafka是由scala和java编写的一款高吞吐量分布式发布订阅消息系统。 应用场景: 异步处理 应用解耦 流量...

  • 初识Kafka

    1、Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞...

网友评论

    本文标题:Kafka初识

    本文链接:https://www.haomeiwen.com/subject/xhqgjttx.html