大数据项目实战之新闻话题的实时统计分析

  1. 云栖社区>
  2. 博客>
  3. 正文

大数据项目实战之新闻话题的实时统计分析

liuyanling41 2018-03-20 21:23:03 浏览6179
展开阅读全文

前言:本文是一个完整的大数据项目实战,实时|离线统计分析用户的搜索话题,并用酷炫的前端界面展示出来。这些指标对网站的精准营销、运营都有极大帮助。架构大致是按照企业标准来的,从日志的采集、转化处理、实时计算、JAVA后台开发、WEB前端展示,一条完整流程线下来,甚至每个节点都用的高可用架构,都考虑了故障转移和容错性。所用到的框架包括:Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Structured Streaming )+Hue+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的语言包括:JAVA、Scala、Shell
由于本文并非零基础教学,所以只讲架构和流程,基础性知识自行查缺补漏。Github已经上传完整项目代码:liuyanling41-Github

最终效果图如下:

view

项目架构图如下:

_

环境准备

image

模拟网站实时产生日志信息

  • 获取数据源,本文是利用搜狗的数据:搜狗实验室
  • 编写java类模拟实时采集网站日志。主要利用Java中的输入输出流。写好后打成jar包传到服务器上
public class ReadWebLog {

    private static String readFileName;
    private static String writeFileName;

    public static void main(String args[]) {
        readFileName = args[0];
        writeFileName = args[1];
        readFile(readFileName);

    }

    public static void readFile(String fileName) {

        try {
            FileInputStream fis = new FileInputStream(fileName);
            InputStreamReader isr = new InputStreamReader(fis, "GBK");
            //以上两步已经可以从文件中读取到一个字符了,但每次只读取一个字符不能满足大数据的需求。故需使用BufferedReader,它具有缓冲的作用,可以一次读取多个字符
            BufferedReader br = new BufferedReader(isr);
            int count = 0;
            while (br.readLine() != null) {
                String line = br.readLine();
                count++;
                // 显示行号
                Thread.sleep(300);
                String str = new String(line.getBytes("UTF8"), "GBK");
                System.out.println("row:" + count + ">>>>>>>>" + line);
                writeFile(writeFileName, line);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void writeFile(String fileName, String conent) {
        try {
            FileOutputStream fos = new FileOutputStream(fileName, true);
            OutputStreamWriter osw = new OutputStreamWriter(fos);
       
            BufferedWriter bw = new BufferedWriter(osw);
            bw.write("\n");
            bw.write(conent);
            bw.close();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

image

  • 编写采集日志的shell脚本
    vim weblog.sh
#/bin/bash
echo "start log"
java -jar /home/weblog.jar /usr/local/weblog.log /home/weblogs.log
  • 运行效果图_

Flume Agent2采集日志信息

主要通过设置Source、Channel、Sink来完成日志采集。

  • 配置flume配置文件 vim agent2.conf
a2.sources = r2
a2.channels = c2
a2.sinks = k2

a2.sources.r2.type = exec
#来源于weblogs.log文件
a2.sources.r2.command = tail -F /home/weblogs.log
a2.sources.r2.channels = c2

a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 100
a2.channels.c2.keep-alive = 10

a2.sinks.k2.type = avro
a2.sinks.k2.channel = c2
# 落地点是master机器的5555端口(主机名和端口号都必须与master机器的flume配置保持一致)
a2.sinks.k2.hostname = master
a2.sinks.k2.port = 5555
  • 编写shell脚本,方便运行。vim flume.sh
#/bin/bash
echo "flume agent2 start"
bin/flume-ng agent --conf conf --name a2 --conf-file conf/agent2.conf -Dflume.root.logger=INFO,console
  • 运行的时候直接 ./flume.sh 即可

Flume Agent3采集日志信息

各方面配置都和Agent2完全一样、省略。

Flume Agent1整合日志信息

  • vim agent1.conf
#Flume Agent1实时整合日志信息

a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaS hbaseS

# flume + hbase
a1.sources.r1.type = avro
a1.sources.r1.channels = kafkaC hbaseC
a1.sources.r1.bind = master
a1.sources.r1.port = 5555

a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000


a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC

# flume + kafka
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000

a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
  • vim flume.sh
#/bin/bash
echo "flume agent1 start"
bin/flume-ng agent --conf conf --name a1 --conf-file conf/agent1.conf -Dflume.root.logger=INFO,console

具体讲解如下:

Flume与Hbase的集成

  • 通过查看官方文档可知,Flume与Hbase的集成主要需要如下参数,表名、列簇名、以及Java类SimpleAsyncHbaseEventSerializer。

Flume官网

  • 改写SimpleAsyncHbaseEventSerializer
    下载Flume源码,需要改写如下两个Java类.

Flume源码
image
image

  • 打成jar包,上传到linux服务器中替换原有flume目录的该jar包
    image

image

  • Flume配置文件配置Sink为Hbase
a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC

Flume与Kafka的集成

  • Flume配置文件:主要配置topic、brokerlist:

image

a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
  • 编写kafka消费端脚本,消费从flume传过来的信息。
    vim flume.sh
#/bin/bash
echo "flume agent1 start"
bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic weblogs  --from-beginning
  • 运行效果图

kafka_flume_

Kafka与Spark集成完成数据实时处理

这里我选择的是2.2版本中的StructuredStreaming,因为它相比SparkStreaming而言有很多优势,它的出现重点就是解决端到端的精确一次语义,保证数据的不丢失不重复,这对于流式计算极为重要。StructuredStreaming的输入源为kafka,spark对来自kafka的数据进行计算,主要就是累加话题量和访问量。具体代码参考github。

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("streaming").getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "master:9092")
      .option("subscribe", "weblogs")
      .load()

    import spark.implicits._
    val lines = df.selectExpr("CAST(value AS STRING)").as[String]
    val weblog = lines.map(_.split(",")).map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))
    val titleCount = weblog.groupBy("searchname").count().toDF("titleName", "webcount")

Spark与Mysql集成

这里选择Mysql是因为,我们的需求只是报表展示,需要在前台展示的字段并不多,关系型数据库完全能够支撑。在Hbase里有几百万条数据(一个浏览话题可能有十几万人搜索过,也就是说一个话题就有十几万条数据,这么大量数据当然要存在Hbase中),而经过spark的计算,这十几万条数据在mysql中就变成了一条数据(XXX话题,XXX浏览量)。
如果业务需求变了,我需要实时查询用户各种信息(数据量很大,字段很多),那么当然就是实时的直接从Hbase里查,而不会在Mysql中。
所以企业中要根据不同的业务需求,充分考虑数据量等问题,进行架构的选择。

    val url = "jdbc:mysql://master:3306/weblog?useSSL=false"
    val username = "root"
    val password = "123456"

    val writer = new JdbcSink(url, username, password)
    val weblogcount = titleCount.writeStream
      .foreach(writer)
      .outputMode("update")
      .start()

    weblogcount.awaitTermination()
 

离线分析:HIVE集成HBASE。

我们知道Hive是一个数据仓库,主要就是转为MapReduce完成对大量数据的离线分析和决策。之前我们已经用Flume集成Hbase,使得Hbase能源源不断的插入数据。那么我们直接将HIVE集成HBase,这样只要Hbase有数据了,那Hive表也就有数据了。怎么集成呢?很简单,用【外部表】就搞定了。

CREATE EXTERNAL TABLE `weblogs`(
  `id` string COMMENT 'from deserializer', 
  `datatime` string COMMENT 'from deserializer', 
  `userid` string COMMENT 'from deserializer', 
  `searchname` string COMMENT 'from deserializer', 
  `retorder` string COMMENT 'from deserializer', 
  `cliorder` string COMMENT 'from deserializer', 
  `cliurl` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 
  'hbase.columns.mapping'=':key,info:datatime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl', 
  'serialization.format'='1')
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='false', 
  'hbase.table.name'='weblogs', 
  'numFiles'='0', 
  'numRows'='-1', 
  'rawDataSize'='-1', 
  'totalSize'='0', 
  'transient_lastDdlTime'='1518778031')

验证一下HBASE和HIVE是不是同步的:

image
image

好了现在我们可以在Hive中尽情的离线分析和决策了~~~

SpringMVC+Mybatis完成对mysql数据的查询

个人觉得传统JDBC实在是太笨重,还是最喜欢Spring整合Mybatis对数据库进行操作。这里主要完成的操作就是对mysql的数据进行查询。详情请参考github,地址文章开头已给出。
image

WebSocket实现全双工通信

既然要实现客户端实时接收服务器端的消息,而服务器端又实时接收客户端的消息,必不可少的就是WebSocket了,WebSocket实现了浏览器与服务器全双工通信(full-duple),能更好的节省服务器资源和带宽并达到实时通讯。WebSocket用HTTP握手之后,服务器和浏览器就使用这条HTTP链接下的TCP连接来直接传输数据,抛弃了复杂的HTTP头部和格式。一旦WebSocket通信连接建立成功,就可以在全双工模式下在客户端和服务器之间来回传送WebSocket消息。即在同一时间、任何方向,都可以全双工发送消息。WebSocket 核心就是OnMessage、OnOpen、OnClose,本项目使用的是和Spring集成的方式,因此需要有configurator = SpringConfigurator.class。

@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)
public class WebSocket {
    @Autowired
    private WebLogService webLogService;
    @OnMessage
    public void onMessage(String message, Session session) throws IOException, InterruptedException {
        String[] titleNames = new String[10];
        Long[] titleCounts = new Long[10];
        Long[] titleSum = new Long[1];
        while (true) {
            Map<String, Object> map = new HashMap<String, Object>();
            List<WebLogBO> list = webLogService.webcount();
            System.out.print(list);
            for (int i = 0; i < list.size(); i++) {
                titleNames[i] = list.get(i).getTitleName();
                titleCounts[i] = list.get(i).getWebcount();
            }
            titleSum[0] = webLogService.websum();
            map.put("titleName", titleNames);
            map.put("titleCount", titleCounts);
            map.put("titleSum", titleSum);
            System.out.print(map);
            session.getBasicRemote().sendText(JSON.toJSONString(map));
            Thread.sleep(1000);
            map.clear();
        }
    }

    @OnOpen
    public void onOpen() {
        System.out.println("Client connected");
    }

    @OnClose
    public void onClose() {
        System.out.println("Connection closed");
    }
}

Echarts完成前端界面展示

大家可以看到开头给出的项目效果图还是蛮漂亮的,其实非常简单,就是用的Echarts这个框架。直接给它传值就ok了,其他前端那些事它都给你搞定了。详情请参考github,地址文章开头已给出。

        function webcount(json) {
            var option = {
                title: {
                    text: '搜狗新闻热点实时统计',
                    subtext: '作者:刘彦伶'
                },
                tooltip: {
                    trigger: 'axis',
                    axisPointer: {
                        type: 'shadow'
                    }
                },
                legend: {
                    data: ['浏览量']
                },
                grid: {
                    left: '3%',
                    right: '4%',
                    bottom: '3%',
                    containLabel: true
                },
                xAxis: {
                    type: 'value',
                    boundaryGap: [0, 0.01]
                },
                yAxis: {
                    type: 'category',
                    data: json.titleName
                },
                series: [
                    {
                        name: '浏览量',
                        type: 'bar',
                        data: json.titleCount
                    },

                ]
            };
            countchart.setOption(option);
        }

本文讲解的比较粗糙,有很多细节的东西,毕竟一整个项目不可能用一篇文章说清楚。。。所以实践的东西需要读者自己去领悟,但是架构、环境搭建、方法、流程还是很有参考价值的!

网友评论

登录后评论
0/500
评论
liuyanling41
+ 关注