基于Hadoop平台的电信客服数据的处理与分析④项目实现:任务16:数据采集/消费/存储

任务描述

“数据生产”的程序启动后,会持续向callLog.csv文件中写入模拟的通话记录。接下来,我们需要将这些实时的数据通过Flume采集到Kafka集群中,然后提供给HBase消费。
Flume:是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。适合下游数据消费者不多的情况,适合数据安全性要求不高的操作,适合与Hadoop生态圈对接的操作。
Kafka:由Apache软件基金会开发的一个开源流处理平台。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。适合下游数据消费者较多的情况,适合数据安全性要求较高的操作。
在这里采用一种常见的组合模型:线上数据----Flume----Kafka----HBase(HDFS)。

任务指导

“数据生产”的程序启动后,会持续向callLog.csv文件中写入模拟的通话记录。接下来将这些实时的数据通过Flume采集到Kafka集群中,然后消费Kafka中的数据并存储到HBase中。

“数据采集/消费/存储”模块的流程图:

image.png

思路:  数据采集

1.   启动ZooKeeper和Kafka集群;

2.   创建Kafka主题;

3.   配置Flume,监控日志文件或目录;

4.   启动Flume收集数据发送到Kafka;

5.   运行在“生产数据”小节中创建的日志生产脚本;

6.   启动Kafka控制台消费者,用于测试对Kafka数据的消费;

7.   编写Kafka的消费者代码,将Kafka中的数据存储到HBase中涉及的类:

  1))HBaseConsumer消费Kafka中的数据存储到HBase

  2)PropertiesUtil提取项目所需参数的辅助类 

  3)HBaseUtil封装HBase的DDL操作

  4)HBaseDAO执行HBase具体执行DDL操作

  5)ConnectionInstance与HBase建立连接

任务实现

1、启动ZooKeeper和Kafka集群

确定在master1、slave1、slave2启动ZooKeeper集群,如未启动通过以下命令启动: 

[root@master1 ~]# zkServer.sh start
[root@slave1 ~]# zkServer.sh start
[root@slave2 ~]# zkServer.sh start

确定Kafka在master1已启动,如未启动通过以下命令启动:

[root@master1 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

2、创建Kafka主题: 

[root@master1 ~]# kafka-topics.sh --create --zookeeper master1:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 4 --topic calllog

查看创建的Kafka主题: 

[root@master1 ~]# kafka-topics.sh --describe --zookeeper slave1:2181,slave2:2181,slave3:2181

3、配置Flume监控数据传送至Kafka主题

在master1进入Flume配置目录新建kafka-conf.properties文件

[root@master1 ~]# cd $FLUME_HOME/conf
[root@master1 conf]# touch kafka-conf.properties 

编辑kafka-conf.properties文件,配置内容如下:

exec-memory-kafka.sources = exec-source
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sinks = kafka-sinkexec-memory-kafka.sources.exec-source.type=exec
exec-memory-kafka.sources.exec-source.command=tail -F /opt/app/callLog.csv
exec-memory-kafka.sources.exec-source.channels=memory-channelexec-memory-kafka.channels.memory-channel.type=memory
exec-memory-kafka.channels.memory-channel.capacity=10000
exec-memory-kafka.channels.memory-channel.transactionCapacity=100exec-memory-kafka.sinks.kafka-sink.type= org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList=master1:9092
exec-memory-kafka.sinks.kafka-sink.topic=calllog
exec-memory-kafka.sinks.kafka-sink.serializer.class=kafka.serializer.StringEncoder
exec-memory-kafka.sinks.kafka-sink.channel=memory-channel

4、启动Flume收集数据后发送至Kafka

[root@master1 conf]# cd $FLUME_HOME
[root@master1 apache-flume-1.9.0-bin]# flume-ng agent -c ./conf/ -f ./conf/kafka-conf.properties -n exec-memory-kafka -Dflume.root.logger=INFO,console

5、运行在任务15创建的日志生产脚本

[root@master1 ~]# cd /opt/app/
[root@master1 app]# nohup sh productlog.sh &

6、启动Kafka控制台消费者,测试Flume信息的输入: 

[root@slave1 ~]# kafka-console-consumer.sh --bootstrap-server master1:9092 --from-beginning --topic calllog

7、接下来编写操作HBase代码,用于消费Kafka数据,并将数据实时存储在HBase中,思路如下:

a)   编写Kafka消费者,读取Kafka集群中缓存的信息,并打印到控制台;

b)   如果读取Kafka中的数据可以打印到控制台,那么就可以编写调用HBase API的方法,将从Kafka中读取出来的数据写入到HBase;

c)   编写一些通用类,用于封装HBase操作的一些通用方法。

1)   创建新的Maven项目

参考任务“数据生产”创建Maven项目ct_consumer

pom.xml文件配置:

<project xmlns="http://maven.apache.org/POM/4.0.0"   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.qst </groupId><artifactId>ct_consumer</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase</artifactId><version>2.3.5</version><type>pom</type></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.3.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin></plugins></build>
</project>

如图为项目创建对应的包和类

2)   HBaseConsumer类:主要用于消费Kafka中缓存的数据,然后调用HBase API持久化数据,将数据保存到HBase 

package kafka;import hbase.HBaseDAO;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import utils.PropertiesUtil;import java.util.Arrays;public class HBaseConsumer {public static void main(String[] args) {KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(PropertiesUtil.properties);kafkaConsumer.subscribe(Arrays.asList(PropertiesUtil.getProperty("kafka.topics")));HBaseDAO hd = new HBaseDAO();while(true){ConsumerRecords<String, String> records = kafkaConsumer.poll(100);for(ConsumerRecord<String, String> cr : records){String oriValue = cr.value();System.out.println(oriValue);hd.put(oriValue);}}}
}

3)   PropertiesUtil类:以解耦合的方式,从文件中读取项目所需的参数,方便进行配置 

package utils;import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;public class PropertiesUtil {public static Properties properties = null;static{InputStream is = ClassLoader.getSystemResourceAsStream("hbase_consumer.properties");properties = new Properties();try {properties.load(is);} catch (IOException e) {e.printStackTrace();}}public static String getProperty(String key){return properties.getProperty(key);}
}

在resources目录下创建hbase_consumer.properties文件,并配置如下

# 设置kafka的brokerlist
bootstrap.servers=master1:9092
# 设置消费者所属的消费组
group.id=hbase_consumer_group
# 设置是否自动确认offset
enable.auto.commit=true
# 自动确认offset的时间间隔
auto.commit.interval.ms=30000
# 设置key,value的反序列化类的全名
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer# 以下为自定义属性设置
# 设置本次消费的主题
kafka.topics=calllog# 设置HBase的一些变量
hbase.zookeeper.quorum=slave1:2181,slave2:2181,slave3:2181
hbase.calllog.regions=3
hbase.calllog.namespace=ns_ct
hbase.calllog.tablename=ns_ct:calllog
hbase.regions.count=3

在resources目录下创建log4j.properties文件,并配置如下

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# Define some default values that can be overridden by system properties
hbase.root.logger=INFO,console
hbase.security.logger=INFO,console
hbase.log.dir=.
hbase.log.file=hbase.log# Define the root logger to the system property "hbase.root.logger".
log4j.rootLogger=${hbase.root.logger}# Logging Threshold
log4j.threshold=ALL#
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n# Rolling File Appender properties
hbase.log.maxfilesize=256MB
hbase.log.maxbackupindex=20# Rolling File Appender
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hbase.log.dir}/${hbase.log.file}log4j.appender.RFA.MaxFileSize=${hbase.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hbase.log.maxbackupindex}log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n#
# Security audit appender
#
hbase.security.log.file=SecurityAuth.audit
hbase.security.log.maxfilesize=256MB
hbase.security.log.maxbackupindex=20
log4j.appender.RFAS=org.apache.log4j.RollingFileAppender
log4j.appender.RFAS.File=${hbase.log.dir}/${hbase.security.log.file}
log4j.appender.RFAS.MaxFileSize=${hbase.security.log.maxfilesize}
log4j.appender.RFAS.MaxBackupIndex=${hbase.security.log.maxbackupindex}
log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.category.SecurityLogger=${hbase.security.logger}
log4j.additivity.SecurityLogger=false
#log4j.logger.SecurityLogger.org.apache.hadoop.hbase.security.access.AccessController=TRACE
#log4j.logger.SecurityLogger.org.apache.hadoop.hbase.security.visibility.VisibilityController=TRACE#
# Null Appender
#
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%nlog4j.appender.asyncconsole=org.apache.hadoop.hbase.AsyncConsoleAppender
log4j.appender.asyncconsole.target=System.err# Custom Logging levelslog4j.logger.org.apache.zookeeper=INFO
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop.hbase=INFO
# Make these two classes INFO-level. Make them DEBUG to see more zk debug.
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
#log4j.logger.org.apache.hadoop.dfs=DEBUG
# Set this class to log INFO only otherwise its OTT
# Enable this to get detailed connection error/retry logging.
# log4j.logger.org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation=TRACE# Uncomment this line to enable tracing on _every_ RPC call (this can be a lot of output)
#log4j.logger.org.apache.hadoop.ipc.HBaseServer.trace=DEBUG# Uncomment the below if you want to remove logging of client region caching'
# and scan of hbase:meta messages
# log4j.logger.org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation=INFO
# log4j.logger.org.apache.hadoop.hbase.client.MetaScanner=INFO# Prevent metrics subsystem start/stop messages (HBASE-17722)
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN

4)   HBaseUtil类:主要用于封装HBase的常用DDL操作,如:创建命名空间、创建表等 

package utils;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Iterator;
import java.util.TreeSet;public class HBaseUtil {/*** 判断表是否存在* @param conf HBaseConfiguration* @param tableName* @return*/public static boolean isExistTable(Configuration conf, String tableName) throws IOException {Connection connection = ConnectionFactory.createConnection(conf);Admin admin = connection.getAdmin();boolean result = admin.tableExists(TableName.valueOf(tableName));admin.close();connection.close();return result;}/*** 初始化命名空间* @param conf* @param namespace*/public static void initNamespace(Configuration conf, String namespace) throws IOException {Connection connection = ConnectionFactory.createConnection(conf);Admin admin = connection.getAdmin();NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).addConfiguration("CREATE_TIME", String.valueOf(System.currentTimeMillis())).addConfiguration("AUTHOR", "Zhang San").build();admin.createNamespace(nd);admin.close();connection.close();}/*** 创建表:协处理器* @param conf* @param tableName* @param columnFamily* @throws IOException*/public static void createTable(Configuration conf, String tableName, int regions, String... columnFamily) throws IOException {Connection connection = ConnectionFactory.createConnection(conf);Admin admin = connection.getAdmin();if(isExistTable(conf, tableName)) return;HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));for(String cf: columnFamily){htd.addFamily(new HColumnDescriptor(cf));}htd.addCoprocessor("hbase.CalleeWriteObserver");admin.createTable(htd, genSplitKeys(regions));admin.close();connection.close();}private static byte[][] genSplitKeys(int regions){//定义一个存放分区键的数组String[] keys = new String[regions];//目前推算,region个数不会超过2位数,所以region分区键格式化为两位数字所代表的字符串DecimalFormat df = new DecimalFormat("00");for(int i = 0; i < regions; i ++){keys[i] = df.format(i) + "|";}byte[][] splitKeys = new byte[regions][];//生成byte[][]类型的分区键的时候,一定要保证分区键是有序的TreeSet<byte[]> treeSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);for(int i = 0; i < regions; i++){treeSet.add(Bytes.toBytes(keys[i]));}Iterator<byte[]> splitKeysIterator = treeSet.iterator();int index = 0;while(splitKeysIterator.hasNext()){byte[] b = splitKeysIterator.next();splitKeys[index ++] = b;}return splitKeys;}/*** 生成rowkey* regionCode_call1_buildTime_call2_flag_duration* @return*/public static String genRowKey(String regionCode, String call1, String buildTime, String call2, String flag, String duration){StringBuilder sb = new StringBuilder();sb.append(regionCode + "_").append(call1 + "_").append(buildTime + "_").append(call2 + "_").append(flag + "_").append(duration);return sb.toString();}/*** 手机号:15837312345* 通话建立时间:2023-01-10 11:20:30 -> 20170110112030* @param call1* @param buildTime* @param regions* @return*/public static String genRegionCode(String call1, String buildTime, int regions){int len = call1.length();//取出后4位号码String lastPhone = call1.substring(len - 4);//取出年月String ym = buildTime.replaceAll("-", "").replaceAll(":", "").replaceAll(" ", "").substring(0, 6);//离散操作1Integer x = Integer.valueOf(lastPhone) ^ Integer.valueOf(ym);//离散操作2int y = x.hashCode();//生成分区号int regionCode = y % regions;//格式化分区号DecimalFormat df = new DecimalFormat("00");return  df.format(regionCode);}
}

上面代码中的HBase协处理器“htd.addCoprocessor("hbase.CalleeWriteObserver");”,具体代码在下面步骤中给出。

5)   HBaseDAO类:主要用于执行具体的DML操作,如保存数据、查询数据、Rowkey生成规则等

package hbase;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import utils.ConnectionInstance;
import utils.HBaseUtil;
import utils.PropertiesUtil;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;public class HBaseDAO {private int regions;private String namespace;private String tableName;public static final Configuration conf;private HTable table;private Connection connection;private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");private List<Put> cacheList = new ArrayList<Put>();static {conf = HBaseConfiguration.create();String zookeeperQuorum = PropertiesUtil.getProperty("hbase.zookeeper.quorum");conf.set("hbase.zookeeper.quorum", zookeeperQuorum);}public HBaseDAO() {try {regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"));namespace = PropertiesUtil.getProperty("hbase.calllog.namespace");tableName = PropertiesUtil.getProperty("hbase.calllog.tablename");if (!HBaseUtil.isExistTable(conf, tableName)) {HBaseUtil.initNamespace(conf, namespace);HBaseUtil.createTable(conf, tableName, regions, "f1", "f2");}} catch (IOException e) {e.printStackTrace();}}/*** ori数据样式: 18576581848,17269452013,2017-08-14 13:38:31,1761* rowkey样式:01_18576581848_20170814133831_17269452013_1_1761* HBase表的列:call1  call2   build_time   build_time_ts   flag   duration* @param ori*/public void put(String ori) {try {if(cacheList.size() == 0){connection = ConnectionInstance.getConnection(conf);table = (HTable) connection.getTable(TableName.valueOf(tableName));
//                table.setAutoFlushTo(false);
//                table.setWriteBufferSize(2 * 1024 * 1024);}String[] splitOri = ori.split(",");String caller = splitOri[0];String callee = splitOri[1];String buildTime = splitOri[2];String duration = splitOri[3];String regionCode = HBaseUtil.genRegionCode(caller, buildTime, regions);String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());//生成rowkeyString rowkey = HBaseUtil.genRowKey(regionCode, caller, buildTimeReplace, callee, "1", duration);//向表中插入该条数据Put put = new Put(Bytes.toBytes(rowkey));put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("call1"), Bytes.toBytes(caller));put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("call2"), Bytes.toBytes(callee));put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));cacheList.add(put);if(cacheList.size() >= 30){table.put(cacheList);table.close();cacheList.clear();}} catch (IOException e) {e.printStackTrace();} catch (ParseException e) {e.printStackTrace();}}
}

6)ConnectionInstance类:主要负责建立HBase连接

package utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class ConnectionInstance {private static Connection conn;public static synchronized Connection getConnection(Configuration conf) {try {if(conn == null || conn.isClosed()){conn = ConnectionFactory.createConnection(conf);}} catch (IOException e) {e.printStackTrace();}return conn;}
}

7)优化HBase数据存储方案,编写协处理器

在使用HBase查询数据时,尽量使用RowKey去定位数据,而非使用ColumnValueFilter或者SingleColumnValueFilter,因为在数据量较大的情况下Filter如果涉及到全表扫描时,效率是非常低的,所以在范围查询时尽量不要使用Filter,而是使用RowKey。

在项目中为了能够让数据尽量离散化,从而避免数据倾斜的发生,我们指定了若干的分区键,为了能让数据根据行键尽可能的分散到各分区当中,在这里行键的生成规则为:regionCode_call1_buildTime_call2_flag_duration,其中regionCode是分区号决定了数据会落入哪一个分区,也决定了是否会发生数据倾斜,regionCode的生成是使用第一个手机号(call1)的后4位和通话时间(年/月)经过两次离散操作,然后再和分区数取余生成的,这样就能保证每个月的通话记录都保存在同一个分区中。

在执行HBase数据查询,设置查询范围时,实际上regionCode生成规则是通过第一个手机号和通话时间(年/月)生成的,所以如果一个人在一个月内如果只接电话,而没有打电话的话,实际上是查不到通话记录的。解决这个问题的方法很多,现在用的比较多的方式是:以存储换效率,也就是说为了保证查询效率可以牺牲一定的存储空间。具体的做法是:将一条记录的第一个号码(call1)和第二个号码(call2)换一个位置,再次插入到数据表中,这样一条记录就会变成两条记录,即一条主叫记录和一条被叫记录。

我们可以在插入数据时,同时插入两条数据。在这里我们也可以使用协处理器为完成,即:当插入一条主叫记录时,会触发协处理器同时插入一条被叫记录。使用协处理器的目的一是可以启动两个线程来完成数据插入工作,第二、也可以让代码更加解耦、更清晰,方便管理。

在上面步骤的HBaseUtil代码中添加了协处理器。

  1. 什么是协处理器;
  2. 协处理器的特性;
  3. 协处理器的应用场景;
  4. 协处理器的分类;
  5. 怎么使用协处理器。

思路:

  1. 编写协处理器,用于协助处理HBase的相关操作,如增删改查等;
  2. 当一条主叫日志成功插入后,在协处理器中,将该日志切换为被叫视角再次插入一次,放到与主叫日志不同的列族中;
  3. 在创建HBase表时,为该表设置协处理器(“htd.addCoprocessor("hbase.CalleeWriteObserver");”);
  4. 上传协处理器的jar包到HDFS;
  5. 在HBase命令行中设置表的协处理器。

新建协处理器类:CalleeWriteObserver,并重写postPut方法,该方法会在数据成功插入之后被回调:

package hbase;import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import utils.HBaseUtil;
import utils.PropertiesUtil;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Optional;public class CalleeWriteObserver implements RegionObserver, RegionCoprocessor {SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");private RegionCoprocessorEnvironment env = null;@Overridepublic Optional<RegionObserver> getRegionObserver() {// Extremely important to be sure that the coprocessor is invoked as a RegionObserverreturn Optional.of(this);}@Overridepublic void start(CoprocessorEnvironment e) throws IOException {env = (RegionCoprocessorEnvironment) e;}@Overridepublic void stop(CoprocessorEnvironment e) throws IOException {// nothing to do here}@Overridepublic void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final Durability durability)throws IOException {//1、获取你想要操作的目标表的名称
//        String targetTableName = PropertiesUtil.getProperty("hbase.calllog.tablename");String targetTableName = "ns_ct:calllog";//2、获取当前成功Put了数据的表(不一定是我们当前业务想要操作的表)String currentTableName = e.getEnvironment().getRegionInfo().getTable().getNameAsString();if(!targetTableName.equals(currentTableName)) return;//01_18047140826_20180110154530_17864211243_1_0360String oriRowKey = Bytes.toString(put.getRow());String[] splitOriRowKey = oriRowKey.split("_");String oldFlag = splitOriRowKey[4];//如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据)if(oldFlag.equals("0")) return;//        int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"));int regions = 3;String caller = splitOriRowKey[1];String callee = splitOriRowKey[3];String buildTime = splitOriRowKey[2];String flag = "0";String duration = splitOriRowKey[5];String regionCode = HBaseUtil.genRegionCode(callee, buildTime, regions);String calleeRowKey = HBaseUtil.genRowKey(regionCode, callee, buildTime, caller, flag, duration);//生成时间戳String buildTimeTs = "";try {buildTimeTs = String.valueOf(sdf.parse(buildTime).getTime());} catch (ParseException e1) {e1.printStackTrace();}// call1 call2 build_time build_time_tsPut calleePut = new Put(Bytes.toBytes(calleeRowKey));calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("call1"), Bytes.toBytes(callee));calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("call2"), Bytes.toBytes(caller));calleePut.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(flag));calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration));calleePut.addColumn(Bytes.toBytes("f2"),Bytes.toBytes("build_time_ts"),Bytes.toBytes(buildTimeTs));Bytes.toBytes(100L);//        Table table = e.getEnvironment().getTable(TableName.valueOf(targetTableName));Table table = e.getEnvironment().getConnection().getTable(TableName.valueOf(targetTableName));table.put(calleePut);table.close();}
}

8)   运行测试 

a)   在项目中选择右侧的Maven标签,双击Lifecycle->package对项目进行打包,当现实“BUILD SUCESS”后,在项目的target文件下将会生成ct_consumer-1.0-SNAPSHOT.jar文件

b)   在HBase表中应用协处理器

将jar包上传到HDFS中(供协处理器使用),执行以下代码:(jar包在ct_consumer项目所在的target目录中,请根据真实情况进入项目目录,此处jar默认在/root/IdeaProjects/ct_consumer/target目录中)

[root@master1 ~]# cd /root/IdeaProjects/ct_consumer/target
[root@master1 target]# hdfs dfs -mkdir -p /hbase/coprocessor/
[root@master1 target]# hdfs dfs -put ct_consumer-1.0-SNAPSHOT.jar /hbase/coprocessor/

启动HBase,如未启动【master1】使用如下命令启动:

[root@master1 ~]# start-hbase.sh

进入HBase命令行将协处理器应用到表,执行【hbase shell】进入HBase命令行,在命令行中执行以下代码:

hbase(main):001:0>  disable 'ns_ct:calllog'
hbase(main):002:0>  alter 'ns_ct:calllog', METHOD => 'table_att', 'coprocessor'=>'/hbase/coprocessor/ct_consumer-1.0-SNAPSHOT.jar|hbase.CalleeWriteObserver|100'
hbase(main):003:0>  enable 'ns_ct:calllog'
hbase(main):003:0>  quit

运行HBaseConsumerl类测试

进入HBase观察【ns_ct:calllog】表的数据

hbase(main):001:0> scan 'ns_ct:calllog',{STARTROW=> '0' , LIMIT => 10}

或者通过count命令查看当前表的总记录数量

hbase(main):002:0> count 'ns_ct:calllog'

如回显所示,在程序运行一定时间后,总记录条数为6720(建议运行一段时间程序以便在“数据展示”任务重得到更好的展示效果)

hbase(main):003:0> count 'ns_ct:calllog'
Current count: 1000, row: 00_16264433631_20171024123701_17269452013_1_1179                                             
Current count: 2000, row: 00_18576581848_20170704010025_15542823911_1_0229                                             
Current count: 3000, row: 01_15978226424_20170731100126_18468618874_1_0795                                             
Current count: 4000, row: 01_18468618874_20171122084724_13980337439_0_0176                                             
Current count: 5000, row: 02_15714728273_20170420175940_17005930322_1_1437                                             
Current count: 6000, row: 02_17526304161_20170714141822_17078388295_1_0325                                             
6720 row(s)
Took 0.6766 seconds                                                                                                    
=> 6720

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1474370.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

期末考试结束,老师该如何私发成绩?

随着期末考试的落幕&#xff0c;校园里又恢复了往日的宁静。然而&#xff0c;对于老师们来说&#xff0c;这并不意味着工作的结束&#xff0c;相反&#xff0c;一系列繁琐的任务才刚刚开始。 成绩单的发放&#xff0c;就是其中一项让人头疼的工作。家长们焦急地等待着孩子的考试…

利用pg_rman进行备份与恢复操作

文章目录 pg_rman简介一、安装配置pg_rman二、创建表与用户三、备份与恢复 pg_rman简介 pg_rman 是 PostgreSQL 的在线备份和恢复工具。类似oracle 的 rman pg_rman 项目的目标是提供一种与 pg_dump 一样简单的在线备份和 PITR 方法。此外&#xff0c;它还为每个数据库集群维护…

kubernetes集群部署:node节点部署和cri-docker运行时安装(四)

安装前准备 同《kubernetes集群部署&#xff1a;环境准备及master节点部署&#xff08;二&#xff09;》 安装cri-docker 在 Kubernetes 1.20 版本之前&#xff0c;Docker 是 Kubernetes 默认的容器运行时。然而&#xff0c;Kubernetes 社区决定在 Kubernetes 1.20 及以后的…

cs231n作业1——SVM

参考文章&#xff1a;cs231n assignment1——SVM SVM 训练阶段&#xff0c;我们的目的是为了得到合适的 &#x1d44a; 和 &#x1d44f; &#xff0c;为实现这一目的&#xff0c;我们需要引进损失函数&#xff0c;然后再通过梯度下降来训练模型。 def svm_loss_naive(W, …

vCenter登录失败报500错误:no healthy upstream

过了个周末登录vCenter的时候提示&#xff1a;HTTP状态500 - 内部服务器错误&#xff1b;重启服务后提示&#xff1a;no healthy upstream。如下图&#xff1a; 看到这个情况&#xff0c;肯定就是部分不服务异常了或者压根就没有启动。至于说因为啥异常还不得而知。想着登录管理…

无人机人员搜救

人员搜救-水域救援 水域搜救&#xff1a;快速水面搜查 物资抛投&#xff1a;救生物资抛投 绳索牵引&#xff1a;牵引救援绳索 领航船艇&#xff1a;水面侦察领航 人员搜救 昼夜搜救&#xff0c;精准定位 水域搜救 经纬 M300 RTK 搭载禅思 H20T 能够满足全天候作业需求&a…

开关电源——调制模式和工作模式

一、开关电源的调制模式 开关电源作为一种广泛应用于电子设备中&#xff0c;用于将一定电压和电流转换为另一种电压和电流的技术&#xff0c;以下是开关电源三种常见的调制模式&#xff1a; 脉冲宽度调制&#xff08;Pulse Width Modulation&#xff09; 脉冲频率调制&#xff…

Transformer神经网络回归预测的MATLAB实现

Transformer神经网络最初是为自然语言处理&#xff08;NLP&#xff09;任务设计的&#xff0c;但它们也可以成功应用于其他序列数据的处理&#xff0c;如时间序列预测和回归任务。 在回归预测中使用Transformer网络通常涉及以下关键步骤和概念&#xff1a; 1. Transformer架…

pycharm无法添加python解释器的解决方法

出现该错误的原因是先前创建过重名的解释器&#xff08;虚拟环境&#xff09;&#xff0c;在pycharm配置中没有完全删除干净。解决方法如下&#xff1a; 首先在文件->设置界面&#xff0c;找到解释器设置。 然后先按图所示点击全部显示虚拟环境&#xff1a; 接着将无法添…

OpenCV教程02:图像处理系统1.0(翻转+形态学+滤波+缩放+旋转)

-------------OpenCV教程集合------------- Python教程99&#xff1a;一起来初识OpenCV&#xff08;一个跨平台的计算机视觉库&#xff09; OpenCV教程01&#xff1a;图像的操作&#xff08;读取显示保存属性获取和修改像素值&#xff09; OpenCV教程02&#xff1a;图像处理…

数字化精益生产系统--QMS质量管理系统

QMS质量管理系统&#xff08;Quality Management System&#xff09;是现代企业管理的关键组成部分&#xff0c;旨在确保产品和服务的质量达到或超过客户需求和期望。 以下是对QMS质量管理系统的功能设计&#xff1a;

运维系列.Nginx配置文件结构功能总结

运维系列 Nginx配置文件结构功能总结 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/qq_285…

塑造卓越企业家IP:多维度视角下的策略解析

在构建和塑造企业家IP的过程中&#xff0c;我们需要从多个维度进行考量&#xff0c;以确保个人品牌能够全面、立体地展现企业家的独特魅力和价值。以下是从不同角度探讨如何做好一个企业家IP的策略。 一、从个人特质出发 深入了解自我&#xff1a;企业家需要清晰地认识到自己的…

Linux 系统管理 03——安装及管理程序

一、rpm 包安装 1、RPM Package Manger 由 Red Hat 公司提供&#xff0c;被众多 Linux 发行版本所采用。 建立统一的数据库文件&#xff0c;详细记录软件包安装、卸载等变化信息&#xff0c;能够自动分析软件包 依赖关系。 2、RPM 软件包 一般命名格式 3、查询已安装的 RP…

电路基础知识汇总

1.0 串连&#xff0c;并联&#xff0c;混连 串联的定义 电路串联是一种电路元件的连接方式&#xff0c;其中各个元件沿着单一路径互相连接&#xff0c;形成一个连续的链。在串联电路中&#xff0c;每个节点最多只连接两个元件&#xff0c;这意味着电流只有一条路径可以通过整个…

昇思25天学习打卡营第十一天|DCGAN生成漫画头像

练习营进入第11天了&#xff0c;今天学习的内容是DCGAN生成漫画头像&#xff0c;记录一下学习内容&#xff1a; GAN基础原理 这部分原理介绍参考GAN图像生成。 DCGAN原理 DCGAN&#xff08;深度卷积对抗生成网络&#xff0c;Deep Convolutional Generative Adversarial Net…

Java的基础语法

叠甲&#xff1a;以下文章主要是依靠我的实际编码学习中总结出来的经验之谈&#xff0c;求逻辑自洽&#xff0c;不能百分百保证正确&#xff0c;有错误、未定义、不合适的内容请尽情指出&#xff01; 文章目录 1.第一份程序1.1.代码编写1.2.代码运行1.2.1.命令行编译1.2.2.IEDA…

FL Studio 2024 发布,添加 FL Cloud 插件、AI 等功能

作为今年最受期待的音乐制作 DAW 更新之一&#xff0c;FL Studio 2024发布引入了新功能&#xff0c;同时采用了新的命名方式&#xff0c;从现在起将把发布年份纳入其名称中。DAW 的新增功能包括在 FL Cloud 中添加插件、AI 驱动的音乐创作工具和 FL Studio 的新效果。 FL Cloud…

【项目设计】负载均衡式——Online Judge

负载均衡式——Online Judge&#x1f60e; 前言&#x1f64c;Online Judge 项目一、项目介绍二、项目技术栈三、项目使用环境四、项目宏观框架五、项目后端服务实现过程1、comm模块设计1.1 Log.hpp实现1.2 Util.hpp实现 2、compiler_server 模块设计2.1compile.hpp文件代码编写…