使用E-MapReduce提交Storm作业处理Kafka数据

简介: 本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。 环境准备 本文选择在杭州Region进行测试,版本选择EMR-3.

本文演示如何在E-MapReduce上部署Storm集群和Kafka集群,并运行Storm作业消费Kafka数据。

环境准备

本文选择在杭州Region进行测试,版本选择EMR-3.8.0,本次测试需要的组件版本有:

  • Kafka:2.11_1.0.0
  • Storm: 1.0.1

本文使用阿里云EMR服务自动化搭建Kafka集群,详细过程请参考创建集群

以上版本依赖包经过测试可用,如果你再测试过程中引入了其他依赖,也一同添加在Storm lib中,具体操作如下:
153811638912659_zh-CN.png
上述操作需要在Kafka集群的每台机器执行一遍。执行完在E-MapReduce控制台重启Storm服务,如下:
153811639012660_zh-CN.png
查看操作历史,待Storm重启完毕:
153811639012661_zh-CN.png

开发Storm和Kafka作业

E-MapReduce已经提供了现成的示例代码,直接使用即可,地址如下:
e-mapreduce-demo
e-mapreduce-sdk
Topic数据准备
登录到Kafka集群
创建一个test topic,分区数10,副本数2

/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:/kafka-1.0.0 --topic test --create

向test topic写入100条数据

/usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 100 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test

说明 以上命令在kafka集群的emr-header-1节点执行,当然也可以客户端机器上执行。
运行Storm作业
登录到Hadoop集群,将第二步中编译得到的
examples-1.1-shaded.jar拷贝到集群emr-header-1上,这里我放在root根目录下面。提交作业:

/usr/lib/storm-current/bin/storm jar examples-1.1-shaded.jar com.aliyun.emr.example.storm.StormKafkaSample test aaa.bbb.ccc.ddd hdfs://emr-header-1:9000 sample

查看作业运行

  • 查看Storm运行状态
    查看集群上服务的WebUI有2种方式:

    本文选择使用SSH隧道方式,访问地址:
    http://localhost:9999/index.html 。可以看到我们刚刚提交的Topology。点进去可以看到执行详情:
    153811639012663_zh-CN.png

    • 查看HDFS输出
    • 查看HDFS文件输出
    [root@emr-header-1 ~]# hadoop fs -ls /foo/
    -rw-r--r--   3 root hadoop     615000 2018-02-11 13:37 /foo/bolt-2-0-1518327393692.txt
    -rw-r--r--   3 root hadoop     205000 2018-02-11 13:37 /foo/bolt-2-0-1518327441777.txt
    [root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
    200

    向kafka写120条数

    [root@emr-header-1 ~]# /usr/lib/kafka-current/bin/kafka-producer-perf-test.sh --num-records 120 --throughput 10000 --record-size 1024 --producer-props bootstrap.servers=emr-worker-1:9092 --topic test
    120 records sent, 816.326531 records/sec (0.80 MB/sec), 35.37 ms avg latency, 134.00 ms max latency, 35 ms 50th, 39 ms 95th, 41 ms 99th, 134 ms 99.9th

    查看HDFS文件输出

    [root@emr-header-1 ~]# hadoop fs -cat /foo/bolt-2-0-1518327441777.txt | wc -l
    320

    总结

    至此,我们成功实现了在E-MapReduce上部署一套Storm集群和一套Kafka集群,并运行Storm作业消费Kafka数据。当然,E-MapReduce也支持Spark Streaming和Flink组件,同样可以方便在Hadoop集群上运行,处理Kafka数据。

    说明
    由于E-MapReduce没有单独的Storm集群类别,所以我们是创建的Hadoop集群,并安装了Storm组件。如果你在使用过程中用不到其他组件,可以很方便地在E-MapReduce管理控制台将那些组件停掉。这样,可以将Hadoop集群作为一个纯粹的Storm集群使用。
    (本文作者为阿里云大数据产品文档工程师)

相关文章
|
3月前
|
消息中间件 JSON druid
Druid:通过 Kafka 加载流数据
Druid:通过 Kafka 加载流数据
36 0
|
5天前
|
消息中间件 存储 算法
深入了解Kafka的数据持久化机制
深入了解Kafka的数据持久化机制
24 0
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
69 2
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 02】集群+副本机制+数据请求+物理存储+数据存储设计(图片来源于网络)
25 1
|
2月前
|
分布式计算 资源调度 Hadoop
Flink报错问题之Sql往kafka表写聚合数据报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
机器学习/深度学习 消息中间件 人工智能
机器学习PAI报错问题之读取kafka数据报错如何解决
人工智能平台PAI是是面向开发者和企业的机器学习/深度学习工程平台,提供包含数据标注、模型构建、模型训练、模型部署、推理优化在内的AI开发全链路服务;本合集将收录PAI常见的报错信息和解决策略,帮助用户迅速定位问题并采取相应措施,确保机器学习项目的顺利推进。
|
2月前
|
SQL 消息中间件 关系型数据库
Flink CDC数据同步问题之向kafka同步数据报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
8月前
|
数据采集 分布式计算 搜索推荐
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(一)
|
8月前
|
数据采集 缓存 分布式计算
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
Hadoop学习---7、OutputFormat数据输出、MapReduce内核源码解析、Join应用、数据清洗、MapReduce开发总结(二)
|
8月前
|
分布式计算 Hadoop 数据处理
Hadoop基础学习---6、MapReduce框架原理(二)
Hadoop基础学习---6、MapReduce框架原理(二)

热门文章

最新文章