storm 读取不到对应的kafka数据

  1. 云栖社区>
  2. 博客>
  3. 正文

storm 读取不到对应的kafka数据

bymain 2018-05-03 13:10:53 浏览724
展开阅读全文

坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <storm.version>1.1.1</storm.version>
    <kafka.version>0.9.0.0</kafka.version>
</properties>

<dependencies>

    <!--storm-core依赖-->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>log4j-over-slf4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <!--storm-kafka 依赖-->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>${storm.version}</version>
    </dependency>

    <!-- kafka 依赖-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>${kafka.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>


    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>
</dependencies>

坑二: input.getBinaryByField(“bytes”); 里面一定要写成bytes,这是上游kafkaSpout 传递过来,源码中也可以看到。
对应位置如下图

这里写图片描述

业务代码体现:
public void execute(Tuple input) {
try {
byte[] bytes = input.getBinaryByField(“bytes”);
String value = new String(bytes);
System.out.println(“value ” + value);
this.collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
this.collector.fail(input);
}
}

坑三:本地测试是,一直接收不到kafkaSpout发送过来的消息:
1)问题是已经连接上了kafka,也读到了对应的分区
2)推断可能是上游的数据发送不过来—》 可能原因shuffleGrouping时 的参数传递错误。
3)最终发现 原来就是SPOUT_ID 获取错了
应该将下面代码中的

     String SPOUT_ID = kafkaSpout.getClass().getSimpleName()
     替换成
     String SPOUT_ID = KafkaSpout.class.getSimpleName();
     即可。



         // kafka 使用的zk hosts
        BrokerHosts hosts = new ZkHosts("hadoop000:2181");
//        指定的kafak的一个根目录,存储的是kafkaSpout读取数据的位置信息(offset)
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());

        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
        String SPOUT_ID = kafkaSpout.getClass().getSimpleName();
        builder.setSpout(SPOUT_ID, kafkaSpout);

        String BOLD_ID = LogProcessBolt.class.getSimpleName();
        builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("StormToKafkaTopology", new Config(), builder.createTopology());

坑四: storm重复消费kafak数据:

官网解释如下:  

这里写图片描述

代码中配置为如下即可
   SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());

  spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费

坑五: storm消费数据,ack,fail这些比配,如果出现问题还可以重试

这里写图片描述

网友评论

登录后评论
0/500
评论
bymain
+ 关注