Flink操作Hbase

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 现在有这样一个场景,我们需要将hbase做成一个数据流,而不是数据集。根据Flink自带的Flink-Hbase只能帮我们做到数据集,所以这个时候选择了重写Hbase的数据源。package com.

现在有这样一个场景,我们需要将hbase做成一个数据流,而不是数据集。根据Flink自带的Flink-Hbase只能帮我们做到数据集,所以这个时候选择了重写Hbase的数据源。


package com.yjp.flink.demo11;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.table.shaded.org.joda.time.DateTime;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.util.Bytes;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

/**

* 以Hbase为数据源

* 从Hbase中获取数据,然后以流的形式发射

* Date : 9:50 2018/3/12

*/

public class HbaseSource implements SourceFunction<String> {

    private static Logger loggerFactory = LoggerFactory.getLogger(HbaseSource.class);

    private static final long serialVersionUID = 1;

    private volatile boolean isRunning = true;

    /**

    * 开始的时间戳

    */

    private long startTime;

    /**

    * 每次查询多长时间的数据

    */

    private long interval;

    /**

    * 需要查询的列名

    */

    private ArrayList<String> columns;

    /**

    * 需要查询的表名

    */

    private String tableName;

    public HbaseSource(long startTime, long interval, ArrayList<String> columns, String tableName) {

        this.startTime = startTime;

        this.interval = interval;

        this.columns = columns;

        this.tableName = tableName;

    }

    public HbaseSource() {

    }

    @Override

    public void run(SourceContext<String> out) {

        if (isRunning) {

            long endTime = DateTime.now().getMillis() - interval;

            ResultScanner rs = new HbaseSource().getHbaseData(tableName, startTime, endTime - startTime, columns);

            new HbaseSource().transmitData(rs, out);

            startTime = endTime;

        }

        while (isRunning) {

            ResultScanner rs = new HbaseSource().getHbaseData(tableName, startTime, interval, columns);

            new HbaseSource().transmitData(rs, out);

            startTime += interval;

            try {

                Thread.sleep(interval);

            } catch (InterruptedException e) {

                throw new RuntimeException("休眠异常", e);

            }

        }

    }

    @Override

    public void cancel() {

    }

    /**

    * 获取数据集

    *

    * @param startTime 时间戳开始的时间

    * @param interval  间隔时间

    * @return 对应的结果集

    */

    private ResultScanner getHbaseData(String tableName, long startTime, long interval, List<String> columns) {

        Configuration conf = HBaseConfiguration.create();

        HTable table;

        Scan scan;

        try {

            table = new HTable(conf, tableName);

            scan = new Scan();

            scan.setTimeRange(startTime, startTime + interval);

            for (String column : columns) {

                String[] columnName = column.split(":");

                scan.addColumn(Bytes.toBytes(columnName[0]), Bytes.toBytes(columnName[1]));

            }

            return table.getScanner(scan);

        } catch (IOException e) {

            throw new RuntimeException("读取数据异常", e);

        }

    }

    private void transmitData(ResultScanner rs, SourceContext<String> out) {

        Result result;

        try {

            while ((result = rs.next()) != null && isRunning) {

                KeyValue[] kvs = result.raw();

                for (KeyValue kv : kvs) {

                    String value = new String(kv.getValue());

                    out.collect(value);

                }

            }

        } catch (IOException e) {

            throw new RuntimeException("结果集遍历异常", e);

        }

    }

}

然后将数据结果加工和处理存入Hbase中

package com.yjp.flink.hbase;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.tuple.Tuple3;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.java.StreamTableEnvironment;

import org.apache.flink.util.Collector;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.ArrayList;

import java.util.List;

public class HbaseToHbase {

    public static Logger logger = LoggerFactory.getLogger(HbaseToHbase.class);

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv);

        sEnv.getConfig().disableSysoutLogging();

        List<String> getColumns = new ArrayList<String>(3);

        getColumns.add("cf1_name");

        getColumns.add("cf2_amount");

        getColumns.add("cf3_groupId");

        List<String> columnFamily = new ArrayList<>(3);

        columnFamily.add("cf1");

        columnFamily.add("cf2");

        columnFamily.add("cf3");

        List<String> setColumns = new ArrayList<>(3);

        setColumns.add("cf2:result");

        DataStreamSource<Orders>

                orderDataStream = sEnv.addSource(new

                HbaseStreamDataSource("Orders", 0L, 2000L, getColumns, Orders.class));

        DataStream<Tuple3<String, Double, Integer>> dataStream = orderDataStream.flatMap(

                new FlatMapFunction<Orders, Tuple3<String, Double, Integer>>() {

            @Override

            public void flatMap(Orders value, Collector<Tuple3<String, Double, Integer>> out) throws Exception {

                out.collect(new Tuple3<String, Double, Integer>(value.getCf1_name(),

                        value.getCf2_amount(), value.getCf3_groupId()));

            }

        });

        dataStream.keyBy(2).sum(1).addSink(

                new SinkHbase<Tuple3<String, Double, Integer>>(

                        "OrderResult", columnFamily, setColumns, "result"));

        sEnv.execute("test Hbase");

    }

}

package com.yjp.flink.hbase;

import org.apache.flink.api.java.tuple.*;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.Admin;

import org.apache.hadoop.hbase.client.Table;

import java.lang.reflect.Method;

import java.util.*;

/**

* 自定义Sink

*

* Date : 17:23 2018/3/12

*/

public class SinkHbase<T> extends RichSinkFunction<T> {

    private static final long serialVersionUID = 1L;

    /**

    * 表名

    */

    private String tableName;

    /**

    * 列族名

    */

    private List<String> columnFails;

    /**

    * 列名 以 family:column的形式传入    column与tuple中的值一一对应

    */

    private List<String> columns;

    /**

    * 行名

    */

    private String rowKey;

    /**

    * @param tableName    表名

    * @param columnFamily 列族名  当表存在时不用输入

    * @param columns      储存的列名 列族:列名

    * @param rowKey      传入的行名

    */

    public SinkHbase(String tableName, List<String> columnFamily, List<String> columns, String rowKey) {

        this.tableName = tableName;

        this.columnFails = columnFamily;

        this.columns = columns;

        this.rowKey = rowKey;

    }

    /**

    * @param tableName 表名

    * @param columns  储存的列名 列族:列名

    * @param rowKey    传入的行名

    */

    public SinkHbase(String tableName, List<String> columns, String rowKey) {

        this.tableName = tableName;

        this.columns = columns;

        this.rowKey = rowKey;

    }

    public SinkHbase() {

    }

    /**

    * 初始化完成连接  当表不存在的时候 新建表和family列

    *

    * @param parameters 调用父类的方法

    * @throws Exception 创建连接失败

    */

    @Override

    public void open(Configuration parameters) throws Exception {

        super.open(parameters);

        Admin admin = FactoryConnect.getConnection().getAdmin();

        final TableName tableName1 = TableName.valueOf(tableName);

        if (!admin.tableExists(tableName1)) {

            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName1);

            for (String columnFamily : columnFails) {

                hTableDescriptor.addFamily(new HColumnDescriptor(columnFamily));

            }

            admin.createTable(hTableDescriptor);

        }

    }

    /**

    * 执行方法 将数据存入hbase

    *

    * @param value 传入的结果

    */

    @Override

    public void invoke(T value, Context context) throws Exception {

        Map<Class, Method> map = new HashMap<>(25);

        new SinkHbase<T>().initMap(map);

        Table table = FactoryConnect.getConnection().getTable(TableName.valueOf(tableName));

        Set<Class> keys = map.keySet();

        for (Class key : keys) {

            if (value.getClass() == key) {

                map.get(key).invoke(new AssignmentTuple(), value, rowKey, columns, table);

                return;

            }

        }

    }

    private void initMap(Map<Class, Method> map) {

        try {

            map.put(Tuple1.class, AssignmentTuple.class.getMethod("setTuple1", Tuple1.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple2.class, AssignmentTuple.class.getMethod("setTuple2", Tuple2.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple3.class, AssignmentTuple.class.getMethod("setTuple3", Tuple3.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple4.class, AssignmentTuple.class.getMethod("setTuple4", Tuple4.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple5.class, AssignmentTuple.class.getMethod("setTuple5", Tuple5.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple6.class, AssignmentTuple.class.getMethod("setTuple6", Tuple6.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple7.class, AssignmentTuple.class.getMethod("setTuple7", Tuple7.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple8.class, AssignmentTuple.class.getMethod("setTuple8", Tuple8.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple9.class, AssignmentTuple.class.getMethod("setTuple9", Tuple9.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple10.class, AssignmentTuple.class.getMethod("setTuple10", Tuple10.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple11.class, AssignmentTuple.class.getMethod("setTuple11", Tuple11.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple12.class, AssignmentTuple.class.getMethod("setTuple12", Tuple12.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple13.class, AssignmentTuple.class.getMethod("setTuple13", Tuple13.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple14.class, AssignmentTuple.class.getMethod("setTuple14", Tuple14.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple15.class, AssignmentTuple.class.getMethod("setTuple15", Tuple15.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple16.class, AssignmentTuple.class.getMethod("setTuple16", Tuple16.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple17.class, AssignmentTuple.class.getMethod("setTuple17", Tuple17.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple18.class, AssignmentTuple.class.getMethod("setTuple18", Tuple18.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple19.class, AssignmentTuple.class.getMethod("setTuple19", Tuple19.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple20.class, AssignmentTuple.class.getMethod("setTuple20", Tuple20.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple21.class, AssignmentTuple.class.getMethod("setTuple21", Tuple21.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple22.class, AssignmentTuple.class.getMethod("setTuple22", Tuple22.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple23.class, AssignmentTuple.class.getMethod("setTuple23", Tuple23.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple24.class, AssignmentTuple.class.getMethod("setTuple24", Tuple24.class, String.class, ArrayList.class, Table.class));

            map.put(Tuple25.class, AssignmentTuple.class.getMethod("setTuple25", Tuple25.class, String.class, ArrayList.class, Table.class));

        } catch (NoSuchMethodException e) {

            throw new RuntimeException("反射失败", e);

        }

    }

}

package com.yjp.flink.hbase;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Connection;

import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

import java.io.Serializable;

/**

* 单例模式 安全的拿到连接

*

* Date : 16:45 2018/3/16

*/

public class FactoryConnect implements Serializable {

    private static volatile Connection connection;

    private FactoryConnect() {

    }

    public static Connection getConnection() throws IOException {

        if (null == connection) {

            synchronized (FactoryConnect.class) {

                try {

                    if (null == connection) {

                        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();

                        connection = ConnectionFactory.createConnection(conf);

                    }

                } catch (Exception e) {

                    System.err.println("读取配置文件异常");

                }

            }

        }

        return connection;

    }

}

package com.yjp.flink.hbase;

import org.apache.flink.api.java.tuple.*;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Table;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

import java.time.Instant;

import java.util.ArrayList;

import java.util.List;

/**

* 将tuple中的存放在Hbase中

*

* Date : 16:49 2018/3/12

*/

public class AssignmentTuple {

    /**

    * tuple 为1

    *

    * @param tuple1  传入tuple的值

    * @param rowKey  传入的rowkey的值

    * @param columns 需要赋值的列

    * @param table  put的table对象

    */

    public void setTuple1(Tuple1<Object> tuple1, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple1, rowKey, columns, table);

    }

    public void setTuple2(Tuple2<Object, Object> tuple2, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple2, rowKey, columns, table);

    }

    public void setTuple3(Tuple3<Object, Object, Object> tuple3, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple3, rowKey, columns, table);

    }

    public void setTuple4(Tuple4<Object, Object, Object, Object> tuple4, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple4, rowKey, columns, table);

    }

    public void setTuple5(Tuple5<Object, Object, Object, Object, Object> tuple5, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple5, rowKey, columns, table);

    }

    public void setTuple6(Tuple6 tuple6, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple6, rowKey, columns, table);

    }

    public void setTuple7(Tuple7 tuple7, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple7, rowKey, columns, table);

    }

    public void setTuple8(Tuple8 tuple8, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple8, rowKey, columns, table);

    }

    public void setTuple9(Tuple9 tuple9, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple9, rowKey, columns, table);

    }

    public void setTuple10(Tuple10 tuple10, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple10, rowKey, columns, table);

    }

    public void setTuple11(Tuple11 tuple11, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple11, rowKey, columns, table);

    }

    public void setTuple12(Tuple12 tuple12, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple12, rowKey, columns, table);

    }

    public void setTuple13(Tuple13 tuple13, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple13, rowKey, columns, table);

    }

    public void setTuple14(Tuple14 tuple14, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple14, rowKey, columns, table);

    }

    public void setTuple15(Tuple15 tuple15, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple15, rowKey, columns, table);

    }

    public void setTuple16(Tuple16 tuple16, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple16, rowKey, columns, table);

    }

    public void setTuple17(Tuple17 tuple17, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple17, rowKey, columns, table);

    }

    public void setTuple18(Tuple18 tuple18, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple18, rowKey, columns, table);

    }

    public void setTuple19(Tuple19 tuple19, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple19, rowKey, columns, table);

    }

    public void setTuple20(Tuple20 tuple20, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple20, rowKey, columns, table);

    }

    public void setTuple21(Tuple21 tuple21, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple21, rowKey, columns, table);

    }

    public void setTuple22(Tuple22 tuple22, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple22, rowKey, columns, table);

    }

    public void setTuple23(Tuple23 tuple23, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple23, rowKey, columns, table);

    }

    public void setTuple24(Tuple24 tuple24, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple24, rowKey, columns, table);

    }

    public void setTuple25(Tuple25 tuple25, String rowKey, ArrayList<String> columns, Table table) {

        new AssignmentTuple().putData(tuple25, rowKey, columns, table);

    }

    /**

    * 将tuple中的数据一一对应的赋值给列

    *

    * @param tuple  tuple中的数据

    * @param rowKey  设置的行值

    * @param columns 对应的列名

    * @param table  对应的table对象

    */

    public void putData(Tuple tuple, String rowKey, List<String> columns, Table table) {

        Put put = new Put(Bytes.toBytes(rowKey));

        Long timeStamp = Instant.now().toEpochMilli();

        for (int i = 0; i < columns.size(); i++) {

            String[] split = columns.get(i).split(":");

            put.addColumn(Bytes.toBytes(split[0]), Bytes.toBytes(split[1]), timeStamp, Bytes.toBytes(tuple.getField(i).toString()));

        }

        try {

            table.put(put);

        } catch (IOException e) {

            throw new RuntimeException("存放失败", e);

        }

    }

}

为了做到一个通用的数据源和数据存储,于是采用了反射的方法。

相关实践学习
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
1月前
|
流计算
在Flink中,你可以通过以下方法为join操作设置并行度
【2月更文挑战第27天】在Flink中,你可以通过以下方法为join操作设置并行度
22 3
|
2月前
|
流计算
在Flink中,如果需要进行split和where操作
【2月更文挑战第6天】在Flink中,如果需要进行split和where操作
17 1
|
3月前
|
SQL Java 分布式数据库
Flink CDC HBase字段类型与Flink SQL类型之间的转换
【1月更文挑战第4天】【1月更文挑战第19篇】Flink CDC HBase字段类型与Flink SQL类型之间的转换
61 1
|
4月前
|
数据处理 数据库 流计算
Flink 操作mapper、sink解析
Flink 操作mapper、sink解析
26 0
|
2月前
|
SQL 消息中间件 分布式数据库
flink sql问题之连接HBase报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
185 0
|
2月前
|
SQL 关系型数据库 分布式数据库
Flink报错问题之用flush方法写入hbase报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
Oracle 关系型数据库 MySQL
Flink CDC数据同步问题之丢失update操作如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
3月前
|
SQL Java 数据库连接
这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
【1月更文挑战第17天】【1月更文挑战第85篇】这个问题是由于Flink在执行SQL语句时,无法找到合适的表工厂来处理JOIN操作。
23 8
|
3月前
|
监控 Java 流计算
Flink中的窗口操作是什么?请解释其作用和使用场景。
Flink中的窗口操作是什么?请解释其作用和使用场景。
26 0
|
3月前
|
SQL 自然语言处理 机器人
Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
【1月更文挑战第3天】【1月更文挑战第12篇】Flink sql滚动窗口怎么操作能实现stream里的allowlateness?
53 1

热门文章

最新文章