前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据技术之_18_大数据离线平台_03_数据处理+工具代码导入+业务 ETL 实现+创建数据库表

大数据技术之_18_大数据离线平台_03_数据处理+工具代码导入+业务 ETL 实现+创建数据库表

作者头像
黑泽君
发布2019-04-23 17:32:40
1.1K0
发布2019-04-23 17:32:40
举报
文章被收录于专栏:黑泽君的专栏黑泽君的专栏

?

十六、数据处理

16.1、ETL 操作

  • 功能:清洗、过滤、补全
  • 数据来源:存储在 HDFS 上的日志文件
  • 数据处理方式:MapReduce
  • 数据保存位置:HBase

16.2、HBase 设计

16.2.1、每天1张表

??即按天分表,一天的数据存放于一张表中,rowkey 采用随机值,不需要有特定规律,尽可能的散列。

16.2.2、倒序或在前缀上加数字

??rowkey 的设计要具体问题具体分析,有时会采取倒序的原则,有时会采取 rowkey 前加上一个随机的数字。(该数字一般要和 HregionServer 的数量求模运算)

16.2.3、预分区

??根据业务预估数据量,提前建好预分区,避免 region 频繁拆分合并造成的性能浪费。

16.3、MapReduce 分析过程

??操作流程:HBase 读取数据 -> InputFormat -> map -> shuffle -> reduce -> OutputFormat -> Mysql

16.4、Hive 分析过程

  • 数据源:使用 Hive external table 创建关联 HBase 中的数据表
  • 数据结果:保存于 HDFS 上(或者保存到 Hive 结果表中)
  • 操作流程:Hive external table -> UDF编写 -> HQL 分析语句编写 -> 保存到 Hive 结果表中(其实也就是在HDFS上) -> Sqoop - 导出数据 -> Mysql

16.5、Mysql 表结构设计

16.5.1、常用关系型数据库表模型

??在多维分析的商业智能解决方案中,根据事实表维度表的关系,又可将常见的模型分为星型模型雪花型模型。在设计逻辑型数据的模型的时候,就应考虑数据是按照星型模型还是雪花型模型进行组织。

  • 星型模型 星型架构是一种非正规化的结构,多维数据集的每一个维度都直接与事实表相连接,不存在渐变维度,所以数据有一定的冗余,如在地域维度表中,存在国家 A 省 B 的城市 C 以及国家 A 省 B 的城市 D 两条记录,那么国家 A 和省 B 的信息分别存储了两次,即存在冗余。
  • 雪花模型 当有一个或多个维表没有直接连接到事实表上,而是通过其他维表连接到事实表上时,其图解就像多个雪花连接在一起,故称雪花模型。雪花模型是对星型模型的扩展。它对星型模型的维表进一步层次化,原有的各维表可能被扩展为小的事实表,形成一些局部的 " 层次" 区域,这些被分解的表都连接到主维度表而不是事实表。如下图,将地域维表又分解为国家、省份、城市等维表。它的优点是:通过最大限度地减少数据存储量以及联合较小的维表来改善查询性能。雪花型结构去除了数据冗余

雪花模型在加载数据集时,ETL 操作在设计上更加复杂,而且由于附属模型的限制,不能并行化

星形模型加载维度表,不需要再维度之间添加附属模型,因此 ETL 就相对简单,而且可以实现高度的并行化。

16.5.2、表结构
  • 维度表:dimension_table
  • 事实表:stats_table
  • 辅助表:主要用于协助 ETL、数据分析等操作获取其他非日志数据,例如:保存会员 id 等

十七、工具代码导入

代码结构图

部分示例代码如下:

pom.xml

代码语言:javascript
复制
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>
????<groupId>com.z</groupId>
????<artifactId>transformer</artifactId>
????<version>0.0.1-SNAPSHOT</version>
????<name>transformer</name>
????<url>http://maven.apache.org</url>

????<properties>
????????<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
????</properties>

????<dependencies>
????????<dependency>
????????????<groupId>junit</groupId>
????????????<artifactId>junit</artifactId>
????????????<version>3.8.1</version>
????????????<scope>test</scope>
????????</dependency>

????????<!--?https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client?-->
????????<dependency>
????????????<groupId>org.apache.hadoop</groupId>
????????????<artifactId>hadoop-client</artifactId>
????????????<version>2.7.2</version>
????????</dependency>

????????<!--?https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-server-resourcemanager?-->
????????<dependency>
????????????<groupId>org.apache.hadoop</groupId>
????????????<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
????????????<version>2.7.2</version>
????????</dependency>

????????<!--?https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client?-->
????????<dependency>
????????????<groupId>org.apache.hadoop</groupId>
????????????<artifactId>hadoop-client</artifactId>
????????????<version>2.7.2</version>
????????</dependency>

????????<!--?https://mvnrepository.com/artifact/org.apache.hbase/hbase-client?-->
????????<dependency>
????????????<groupId>org.apache.hbase</groupId>
????????????<artifactId>hbase-client</artifactId>
????????????<version>1.3.1</version>
????????</dependency>

????????<!--?https://mvnrepository.com/artifact/org.apache.hbase/hbase-server?-->
????????<dependency>
????????????<groupId>org.apache.hbase</groupId>
????????????<artifactId>hbase-server</artifactId>
????????????<version>1.3.1</version>
????????</dependency>

????????<!--?https://mvnrepository.com/artifact/org.apache.hive/hive-exec?-->
????????<dependency>
????????????<groupId>org.apache.hive</groupId>
????????????<artifactId>hive-exec</artifactId>
????????????<version>1.2.1</version>
????????</dependency>

????????<!--?mysql?start?-->
????????<dependency>
????????????<groupId>mysql</groupId>
????????????<artifactId>mysql-connector-java</artifactId>
????????????<version>5.1.27</version>
????????</dependency>
????????<!--?mysql?end?-->

????????<!--?用户浏览器解析?-->
????????<dependency>
????????????<groupId>cz.mallat.uasparser</groupId>
????????????<artifactId>uasparser</artifactId>
????????????<version>0.6.1</version>
????????</dependency>

????????<!--?json包?-->
????????<dependency>
????????????<groupId>com.google.code.gson</groupId>
????????????<artifactId>gson</artifactId>
????????????<version>2.6.2</version>
????????</dependency>
????</dependencies>
????<profiles>
????????<profile>
????????????<!--?唯一id,表示本地?-->
????????????<id>local</id>
????????????<activation>
????????????????<!--?maven编译的时候,默认环境,该参数为true只能存在一个?-->
????????????????<activeByDefault>true</activeByDefault>
????????????</activation>
????????????<build>
????????????????<!--?插件信息?-->
????????????????<plugins>
????????????????????<plugin>
????????????????????????<!--?将指定包的java文件进行编译打包操作?-->
????????????????????????<groupId>org.codehaus.mojo</groupId>
????????????????????????<artifactId>build-helper-maven-plugin</artifactId>
????????????????????????<version>1.4</version>
????????????????????????<executions>
????????????????????????????<execution>
????????????????????????????????<id>add-source</id>
????????????????????????????????<phase>generate-sources</phase>
????????????????????????????????<goals>
????????????????????????????????????<goal>add-source</goal>
????????????????????????????????</goals>
????????????????????????????????<configuration>
????????????????????????????????????<sources>
????????????????????????????????????????<source>${basedir}/src/main/java</source>
????????????????????????????????????</sources>
????????????????????????????????</configuration>
????????????????????????????</execution>
????????????????????????</executions>
????????????????????</plugin>
????????????????</plugins>
????????????</build>
????????</profile>

????????<profile>
????????????<!--?需要最终形成一个jar文件?-->
????????????<id>dev</id>
????????????<build>
????????????????<plugins>
????????????????????<plugin>
????????????????????????<groupId>org.codehaus.mojo</groupId>
????????????????????????<artifactId>build-helper-maven-plugin</artifactId>
????????????????????????<version>1.4</version>
????????????????????????<executions>
????????????????????????????<execution>
????????????????????????????????<id>add-source</id>
????????????????????????????????<phase>generate-sources</phase>
????????????????????????????????<goals>
????????????????????????????????????<goal>add-source</goal>
????????????????????????????????</goals>
????????????????????????????????<configuration>
????????????????????????????????????<sources>
????????????????????????????????????????<source>${basedir}/src/main/java</source>
????????????????????????????????????</sources>
????????????????????????????????</configuration>
????????????????????????????</execution>
????????????????????????</executions>
????????????????????</plugin>

????????????????????<plugin>
????????????????????????<!--?将第三方的依赖包,一起打入到最终形成的jar文件中?-->
????????????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????????????<artifactId>maven-shade-plugin</artifactId>
????????????????????????<version>2.1</version>
????????????????????????<executions>
????????????????????????????<execution>
????????????????????????????????<phase>package</phase>
????????????????????????????????<goals>
????????????????????????????????????<goal>shade</goal>
????????????????????????????????</goals>
????????????????????????????????<configuration>
????????????????????????????????????<artifactSet>
????????????????????????????????????????<includes>
????????????????????????????????????????????<include>cz.mallat.uasparser:uasparser</include>
????????????????????????????????????????????<include>net.sourceforge.jregex:jregex</include>
????????????????????????????????????????????<include>mysql:mysql-connector-java</include>
????????????????????????????????????????</includes>
????????????????????????????????????</artifactSet>
????????????????????????????????</configuration>
????????????????????????????</execution>
????????????????????????</executions>
????????????????????</plugin>
????????????????</plugins>
????????????</build>
????????</profile>
????</profiles>

????<build>
????????<testSourceDirectory>src/test/java</testSourceDirectory>
????????<plugins>
????????????<plugin>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<version>3.3</version>
????????????????<configuration>
????????????????????<source>1.7</source>
????????????????????<target>1.7</target>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????????<pluginManagement>
????????????<plugins>
????????????????<!--This?plugin's?configuration?is?used?to?store?Eclipse?m2e?settings?
????????????????????only.?It?has?no?influence?on?the?Maven?build?itself.?-->
????????????????<plugin>
????????????????????<groupId>org.eclipse.m2e</groupId>
????????????????????<artifactId>lifecycle-mapping</artifactId>
????????????????????<version>1.0.0</version>
????????????????????<configuration>
????????????????????????<lifecycleMappingMetadata>
????????????????????????????<pluginExecutions>
????????????????????????????????<pluginExecution>
????????????????????????????????????<pluginExecutionFilter>
????????????????????????????????????????<groupId>org.codehaus.mojo</groupId>
????????????????????????????????????????<artifactId>
????????????????????????????????????????????build-helper-maven-plugin
????????????????????????????????????????</artifactId>
????????????????????????????????????????<versionRange>[1.4,)</versionRange>
????????????????????????????????????????<goals>
????????????????????????????????????????????<goal>add-source</goal>
????????????????????????????????????????</goals>
????????????????????????????????????</pluginExecutionFilter>
????????????????????????????????????<action>
????????????????????????????????????????<ignore></ignore>
????????????????????????????????????</action>
????????????????????????????????</pluginExecution>
????????????????????????????</pluginExecutions>
????????????????????????</lifecycleMappingMetadata>
????????????????????</configuration>
????????????????</plugin>
????????????</plugins>
????????</pluginManagement>
????</build>
</project>

resources 目录下文件

core-site.xml

代码语言:javascript
复制
<?xml?version="1.0"?encoding="UTF-8"?>
<?xml-stylesheet?type="text/xsl"?href="configuration.xsl"?>
<!--
???????Licensed?under?the?Apache?License,?Version?2.0?(the?"License");
??you?may?not?use?this?file?except?in?compliance?with?the?License.
??You?may?obtain?a?copy?of?the?License?at

????http://www.apache.org/licenses/LICENSE-2.0

??Unless?required?by?applicable?law?or?agreed?to?in?writing,?software
??distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,
??WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.
??See?the?License?for?the?specific?language?governing?permissions?and
??limitations?under?the?License.?See?accompanying?LICENSE?file.
-->

<!--?Put?site-specific?property?overrides?in?this?file.?-->

<configuration>
????<!--?指定HDFS中NameNode的地址?-->
????<property>
????????<name>fs.defaultFS</name>
????????<value>hdfs://hadoop102:9000</value>
????</property>

????<!--?指定Hadoop运行时产生文件的存储目录?-->
????<property>
????????<name>hadoop.tmp.dir</name>
????????<value>/opt/module/hadoop-2.7.2/data/tmp</value>
????</property>

????<property>
????????<name>hadoop.proxyuser.admin.hosts</name>
????????<value>*</value>
????</property>

????<property>
????????<name>hadoop.proxyuser.admin.groups</name>
????????<value>*</value>
????</property>

????<property>
????????<name>hadoop.proxyuser.httpfs.hosts</name>
????????<value>*</value>
????</property>

????<property>
????????<name>hadoop.proxyuser.httpfs.groups</name>
????????<value>*</value>
????</property>

????<!--?配置垃圾回收时间为1分钟
????<property>
????????<name>fs.trash.interval</name>
????????<value>1</value>
????</property>
????-->

????<!--?修改访问垃圾回收站用户名称为?atguigu
????<property>
????????<name>hadoop.http.staticuser.user</name>
????????<value>atguigu</value>
????</property>
????-->
</configuration>

hbase-site.xml

代码语言:javascript
复制
<?xml?version="1.0"?>
<?xml-stylesheet?type="text/xsl"?href="configuration.xsl"?>
<!--
/**
?*
?*?Licensed?to?the?Apache?Software?Foundation?(ASF)?under?one
?*?or?more?contributor?license?agreements.??See?the?NOTICE?file
?*?distributed?with?this?work?for?additional?information
?*?regarding?copyright?ownership.??The?ASF?licenses?this?file
?*?to?you?under?the?Apache?License,?Version?2.0?(the
?*?"License");?you?may?not?use?this?file?except?in?compliance
?*?with?the?License.??You?may?obtain?a?copy?of?the?License?at
?*
?*?????http://www.apache.org/licenses/LICENSE-2.0
?*
?*?Unless?required?by?applicable?law?or?agreed?to?in?writing,?software
?*?distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,
?*?WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.
?*?See?the?License?for?the?specific?language?governing?permissions?and
?*?limitations?under?the?License.
?*/
-->
<configuration>
????<property>
????????<name>hbase.rootdir</name>
????????<value>hdfs://hadoop102:9000/hbase</value>
????</property>

????<property>
????????<name>hbase.cluster.distributed</name>
????????<value>true</value>
????</property>

????<!--?0.98后的新变动,之前版本没有.port,默认端口为16000?-->
????<property>
????????<name>hbase.master.port</name>
????????<value>16000</value>
????</property>

????<property>
????????<name>hbase.zookeeper.quorum</name>
????????<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
????</property>

????<property>
????????<name>hbase.zookeeper.property.dataDir</name>
????????<value>/opt/module/zookeeper-3.4.10/zkData</value>
????</property>

????<property>
????????<name>hbase.coprocessor.region.classes</name>
????????<value>com.china.hbase.CalleeWriteObserver</value>
????</property>

????<property>
????????<name>zookeeper.session.timeout</name>
????????<value>90000</value>
????</property>
</configuration>

hdfs-site.xml

代码语言:javascript
复制
<?xml?version="1.0"?encoding="UTF-8"?>
<?xml-stylesheet?type="text/xsl"?href="configuration.xsl"?>
<!--
??Licensed?under?the?Apache?License,?Version?2.0?(the?"License");
??you?may?not?use?this?file?except?in?compliance?with?the?License.
??You?may?obtain?a?copy?of?the?License?at

????http://www.apache.org/licenses/LICENSE-2.0

??Unless?required?by?applicable?law?or?agreed?to?in?writing,?software
??distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,
??WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.
??See?the?License?for?the?specific?language?governing?permissions?and
??limitations?under?the?License.?See?accompanying?LICENSE?file.
-->

<!--?Put?site-specific?property?overrides?in?this?file.?-->

<configuration>
????<!--?指定HDFS副本的数量,默认是3个?-->
????<property>
????????<name>dfs.replication</name>
????????<value>1</value>
????</property>

????<!--?指定Hadoop辅助名称节点主机配置?-->
????<property>
????????<name>dfs.namenode.secondary.http-address</name>
????????<value>hadoop104:50090</value>
????</property>

????<!--?关闭权限检查-->
????<property>
????????<name>dfs.permissions.enable</name>
????????<value>false</value>
????</property>

????<property>
????????<name>dfs.webhdfs.enabled</name>
????????<value>true</value>
????</property>

????<!--?NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性。
????<property>
????????<name>dfs.namenode.name.dir</name>
????????<value>file:///${hadoop.tmp.dir}/dfs/name1,file:///${hadoop.tmp.dir}/dfs/name2</value>
????</property>
????-->

????<!--?DataNode也可以配置成多个目录,每个目录存储的数据不一样。即:数据不是副本。
????<property>
????????<name>dfs.datanode.data.dir</name>
????????<value>file:///${hadoop.tmp.dir}/dfs/data1,file:///${hadoop.tmp.dir}/dfs/data2</value>
????</property>
????-->

????<!--?白名单信息
????<property>
????????<name>dfs.hosts</name>
????????<value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
????</property>
????-->

????<!--?黑名单信息
????<property>
????????<name>dfs.hosts.exclude</name>
????????<value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
????</property>
????-->
</configuration>

log4j.properties

代码语言:javascript
复制
#?Licensed?to?the?Apache?Software?Foundation?(ASF)?under?one
#?or?more?contributor?license?agreements.??See?the?NOTICE?file
#?distributed?with?this?work?for?additional?information
#?regarding?copyright?ownership.??The?ASF?licenses?this?file
#?to?you?under?the?Apache?License,?Version?2.0?(the
#?"License");?you?may?not?use?this?file?except?in?compliance
#?with?the?License.??You?may?obtain?a?copy?of?the?License?at
#
#?????http://www.apache.org/licenses/LICENSE-2.0
#
#?Unless?required?by?applicable?law?or?agreed?to?in?writing,?software
#?distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,
#?WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.
#?See?the?License?for?the?specific?language?governing?permissions?and
#?limitations?under?the?License.

#?Define?some?default?values?that?can?be?overridden?by?system?properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log

#?Define?the?root?logger?to?the?system?property?"hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger},?EventCounter

#?Logging?Threshold
log4j.threshold=ALL

#?Null?Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

#
#?Rolling?File?Appender?-?cap?space?usage?at?5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}

log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}

log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

#?Pattern?format:?Date?LogLevel?LoggerName?LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601}?%p?%c:?%m%n
#?Debugging?Pattern?format
#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601}?%-5p?%c{2}?(%F:%M(%L))?-?%m%n


#
#?Daily?Rolling?File?Appender
#

log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}

#?Rollver?at?midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

#?30-day?backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

#?Pattern?format:?Date?LogLevel?LoggerName?LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601}?%p?%c:?%m%n
#?Debugging?Pattern?format
#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601}?%-5p?%c{2}?(%F:%M(%L))?-?%m%n


#
#?console
#?Add?"console"?to?rootlogger?above?if?you?want?to?use?this?
#

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd?HH:mm:ss}?%p?%c{2}:?%m%n

#
#?TaskLog?Appender
#

#Default?values
hadoop.tasklog.taskid=null
hadoop.tasklog.iscleanup=false
hadoop.tasklog.noKeepSplits=4
hadoop.tasklog.totalLogFileSize=100
hadoop.tasklog.purgeLogSplits=true
hadoop.tasklog.logsRetainHours=12

log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}

log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601}?%p?%c:?%m%n

#
#?HDFS?block?state?change?log?from?block?manager
#
#?Uncomment?the?following?to?suppress?normal?block?state?change
#?messages?from?BlockManager?in?NameNode.
#log4j.logger.BlockStateChange=WARN

#
#Security?appender
#
hadoop.security.logger=INFO,NullAppender
hadoop.security.log.maxfilesize=256MB
hadoop.security.log.maxbackupindex=20
log4j.category.SecurityLogger=${hadoop.security.logger}
hadoop.security.log.file=SecurityAuth-${user.name}.audit
log4j.appender.RFAS=org.apache.log4j.RollingFileAppender?
log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601}?%p?%c:?%m%n
log4j.appender.RFAS.MaxFileSize=${hadoop.security.log.maxfilesize}
log4j.appender.RFAS.MaxBackupIndex=${hadoop.security.log.maxbackupindex}

#
#?Daily?Rolling?Security?appender
#
log4j.appender.DRFAS=org.apache.log4j.DailyRollingFileAppender?
log4j.appender.DRFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
log4j.appender.DRFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.DRFAS.layout.ConversionPattern=%d{ISO8601}?%p?%c:?%m%n
log4j.appender.DRFAS.DatePattern=.yyyy-MM-dd

#
#?hadoop?configuration?logging
#

#?Uncomment?the?following?line?to?turn?off?configuration?deprecation?warnings.
#?log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN

#
#?hdfs?audit?logging
#
hdfs.audit.logger=INFO,NullAppender
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=${hdfs.audit.logger}
log4j.additivity.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=false
log4j.appender.RFAAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.RFAAUDIT.File=${hadoop.log.dir}/hdfs-audit.log
log4j.appender.RFAAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAAUDIT.layout.ConversionPattern=%d{ISO8601}?%p?%c{2}:?%m%n
log4j.appender.RFAAUDIT.MaxFileSize=${hdfs.audit.log.maxfilesize}
log4j.appender.RFAAUDIT.MaxBackupIndex=${hdfs.audit.log.maxbackupindex}

#
#?mapred?audit?logging
#
mapred.audit.logger=INFO,NullAppender
mapred.audit.log.maxfilesize=256MB
mapred.audit.log.maxbackupindex=20
log4j.logger.org.apache.hadoop.mapred.AuditLogger=${mapred.audit.logger}
log4j.additivity.org.apache.hadoop.mapred.AuditLogger=false
log4j.appender.MRAUDIT=org.apache.log4j.RollingFileAppender
log4j.appender.MRAUDIT.File=${hadoop.log.dir}/mapred-audit.log
log4j.appender.MRAUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.MRAUDIT.layout.ConversionPattern=%d{ISO8601}?%p?%c{2}:?%m%n
log4j.appender.MRAUDIT.MaxFileSize=${mapred.audit.log.maxfilesize}
log4j.appender.MRAUDIT.MaxBackupIndex=${mapred.audit.log.maxbackupindex}

#?Custom?Logging?levels

#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
#log4j.logger.org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit=DEBUG

#?Jets3t?library
log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR

#?AWS?SDK?&?S3A?FileSystem
log4j.logger.com.amazonaws=ERROR
log4j.logger.com.amazonaws.http.AmazonHttpClient=ERROR
log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=WARN

#
#?Event?Counter?Appender
#?Sends?counts?of?logging?messages?at?different?severity?levels?to?Hadoop?Metrics.
#
log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter

#
#?Job?Summary?Appender?
#
#?Use?following?logger?to?send?summary?to?separate?file?defined?by?
#?hadoop.mapreduce.jobsummary.log.file?:
#?hadoop.mapreduce.jobsummary.logger=INFO,JSA
#?
hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
hadoop.mapreduce.jobsummary.log.maxfilesize=256MB
hadoop.mapreduce.jobsummary.log.maxbackupindex=20
log4j.appender.JSA=org.apache.log4j.RollingFileAppender
log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
log4j.appender.JSA.MaxFileSize=${hadoop.mapreduce.jobsummary.log.maxfilesize}
log4j.appender.JSA.MaxBackupIndex=${hadoop.mapreduce.jobsummary.log.maxbackupindex}
log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd?HH:mm:ss}?%p?%c{2}:?%m%n
log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false

#
#?Yarn?ResourceManager?Application?Summary?Log?
#
#?Set?the?ResourceManager?summary?log?filename
yarn.server.resourcemanager.appsummary.log.file=rm-appsummary.log
#?Set?the?ResourceManager?summary?log?level?and?appender
yarn.server.resourcemanager.appsummary.logger=${hadoop.root.logger}
#yarn.server.resourcemanager.appsummary.logger=INFO,RMSUMMARY

#?To?enable?AppSummaryLogging?for?the?RM,?
#?set?yarn.server.resourcemanager.appsummary.logger?to?
#?<LEVEL>,RMSUMMARY?in?hadoop-env.sh

#?Appender?for?ResourceManager?Application?Summary?Log
#?Requires?the?following?properties?to?be?set
#????-?hadoop.log.dir?(Hadoop?Log?directory)
#????-?yarn.server.resourcemanager.appsummary.log.file?(resource?manager?app?summary?log?filename)
#????-?yarn.server.resourcemanager.appsummary.logger?(resource?manager?app?summary?log?level?and?appender)

log4j.logger.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=${yarn.server.resourcemanager.appsummary.logger}
log4j.additivity.org.apache.hadoop.yarn.server.resourcemanager.RMAppManager$ApplicationSummary=false
log4j.appender.RMSUMMARY=org.apache.log4j.RollingFileAppender
log4j.appender.RMSUMMARY.File=${hadoop.log.dir}/${yarn.server.resourcemanager.appsummary.log.file}
log4j.appender.RMSUMMARY.MaxFileSize=256MB
log4j.appender.RMSUMMARY.MaxBackupIndex=20
log4j.appender.RMSUMMARY.layout=org.apache.log4j.PatternLayout
log4j.appender.RMSUMMARY.layout.ConversionPattern=%d{ISO8601}?%p?%c{2}:?%m%n

#?HS?audit?log?configs
#mapreduce.hs.audit.logger=INFO,HSAUDIT
#log4j.logger.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=${mapreduce.hs.audit.logger}
#log4j.additivity.org.apache.hadoop.mapreduce.v2.hs.HSAuditLogger=false
#log4j.appender.HSAUDIT=org.apache.log4j.DailyRollingFileAppender
#log4j.appender.HSAUDIT.File=${hadoop.log.dir}/hs-audit.log
#log4j.appender.HSAUDIT.layout=org.apache.log4j.PatternLayout
#log4j.appender.HSAUDIT.layout.ConversionPattern=%d{ISO8601}?%p?%c{2}:?%m%n
#log4j.appender.HSAUDIT.DatePattern=.yyyy-MM-dd

#?Http?Server?Request?Logs
#log4j.logger.http.requests.namenode=INFO,namenoderequestlog
#log4j.appender.namenoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.namenoderequestlog.Filename=${hadoop.log.dir}/jetty-namenode-yyyy_mm_dd.log
#log4j.appender.namenoderequestlog.RetainDays=3

#log4j.logger.http.requests.datanode=INFO,datanoderequestlog
#log4j.appender.datanoderequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.datanoderequestlog.Filename=${hadoop.log.dir}/jetty-datanode-yyyy_mm_dd.log
#log4j.appender.datanoderequestlog.RetainDays=3

#log4j.logger.http.requests.resourcemanager=INFO,resourcemanagerrequestlog
#log4j.appender.resourcemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.resourcemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-resourcemanager-yyyy_mm_dd.log
#log4j.appender.resourcemanagerrequestlog.RetainDays=3

#log4j.logger.http.requests.jobhistory=INFO,jobhistoryrequestlog
#log4j.appender.jobhistoryrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.jobhistoryrequestlog.Filename=${hadoop.log.dir}/jetty-jobhistory-yyyy_mm_dd.log
#log4j.appender.jobhistoryrequestlog.RetainDays=3

#log4j.logger.http.requests.nodemanager=INFO,nodemanagerrequestlog
#log4j.appender.nodemanagerrequestlog=org.apache.hadoop.http.HttpRequestLogAppender
#log4j.appender.nodemanagerrequestlog.Filename=${hadoop.log.dir}/jetty-nodemanager-yyyy_mm_dd.log
#log4j.appender.nodemanagerrequestlog.RetainDays=3

十八、业务 ETL 实现

18.1、功能

  • 过滤内容:过滤无效数据,比如缺少 uuid,缺少会话 ip,订单事件中缺少订单 id。
  • 补全内容:IP 地址信息补全地域信息(国家、省份、城市等)、浏览器相关信息补全,服务器时间补全等等。

18.2、数据

18.2.1、上传方式
  • Flume: 在Flume 工作正常的情况下,所有的日志均由 Flume 上传写入。(详见第13.4章节)
  • Shell 手动:当 Flume 进程出现异常,需要手动执行脚本的上传。(详见第十五章节)
18.2.2、流程
  • 使用 MapReduce 通过 TextInputFormat 的方式将 HDFS 中的数据读取到 map 中,最终通过 TableOutputFormat 到 HBase 中。
18.2.3、细节分析

日志解析

??日志存储于 HDFS 中,一行一条日志,解析出操作行为中具体的 key-value 值,然后进行解码操作。

IP地址解析/补全

浏览器信息解析

HBase rowkey 设计

注意规则:尽可能的短小,占用内存少,尽可能的均匀分布。(即散列)

HBase 表的创建

??使用 Java API 创建。

18.3、代码实现

关键类:

??LoggerUtil.java

示例代码如下:

代码语言:javascript
复制
package?com.z.transformer.util;

import?java.net.URLDecoder;
import?java.util.HashMap;
import?java.util.Map;

import?org.apache.commons.lang.StringUtils;
import?org.apache.log4j.Logger;

import?com.z.transformer.common.EventLogConstants;
import?com.z.transformer.util.IPSeekerExt.RegionInfo;

import?cz.mallat.uasparser.UserAgentInfo;

public?class?LoggerUtil?{

????//?日志输出提示
????private?static?final?Logger?logger?=?Logger.getLogger(LoggerUtil.class);

????/**
?????*?解析给定的日志行,如果解析成功返回一个有值的?map?集合,如果解析失败,返回一个?empty?集合
?????*?
?????*?@param?logText
?????*?@return
?????*/
????public?static?Map<String,?String>?handleLogText(String?logText)?{
????????Map<String,?String>?result?=?new?HashMap<String,?String>();
????????//?1、开始解析
????????//?hadoop?集群中默认只有?org.apache.commons.lang.StringUtils?所在的?jar?包,如果使用其他
????????//?StringUtils,hadoop?集群中需要导入该?StringUtils?依赖的?jar?包方可使用
????????if?(StringUtils.isNotBlank(logText))?{
????????????//?日志行非空,可以进行解析
????????????String[]?splits?=?logText.trim().split(EventLogConstants.LOG_SEPARTIOR);?//?日志分隔符
????????????????????????????????????????????????????????????????????????????????????????//?^A
????????????//?192.168.25.102^A1555318954.798^A/what.png?u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=......
????????????if?(splits.length?==?3)?{
????????????????//?日志格式是正确的,进行解析
????????????????String?ip?=?splits[0].trim();
????????????????//?将?ip?地址封装进?Map?集合中
????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_IP,?ip);
????????????????long?serverTime?=?TimeUtil.parseNginxServerTime2Long(splits[1].trim());
????????????????if?(serverTime?!=?-1L)?{
????????????????????//?表示服务器时间解析正确,而且?serverTime?就是对于的毫秒级的时间戳
????????????????????//?将?serverTime?封装进?Map?集合中
????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME,?String.valueOf(serverTime));
????????????????}

????????????????//?获取请求体
????????????????String?requestBody?=?splits[2].trim();
????????????????int?index?=?requestBody.indexOf("?");?//???符号所在的索引位置
????????????????if?(index?>=?0?&&?index?!=?requestBody.length()?-?1)?{
????????????????????//?在请求参数中存在??,而且???不是最后一个字符的情况,则截取?后面的内容
????????????????????requestBody?=?requestBody.substring(index?+?1);
????????????????}?else?{
????????????????????requestBody?=?null;
????????????????}

????????????????if?(StringUtils.isNotBlank(requestBody))?{
????????????????????//?非空,开始处理请求参数
????????????????????handleRequestBody(result,?requestBody);

????????????????????//?开始补全?ip?地址
????????????????????RegionInfo?info?=?IPSeekerExt.getInstance().analysisIp(result.get(EventLogConstants.LOG_COLUMN_NAME_IP));?//?用户ip地址
????????????????????if?(info?!=?null)?{
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_COUNTRY,?info.getCountry());?//?国家
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_PROVINCE,?info.getProvince());?//?省份
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_CITY,?info.getCity());?//?城市
????????????????????}

????????????????????//?开始补全浏览器信息
????????????????????UserAgentInfo?uaInfo?=?UserAgentUtil.analyticUserAgent(result.get(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT));?//?浏览器user?agent参数
????????????????????if?(uaInfo?!=?null)?{
????????????????????????//?浏览器名称
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME,?uaInfo.getUaFamily());?//?浏览器名称
????????????????????????//?浏览器版本号
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION,?uaInfo.getBrowserVersionInfo());?//?浏览器版本
????????????????????????//?浏览器所在操作系统
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_OS_NAME,?uaInfo.getOsFamily());?//?操作系统名称
????????????????????????//?浏览器所在操作系统的版本
????????????????????????result.put(EventLogConstants.LOG_COLUMN_NAME_OS_VERSION,?uaInfo.getOsName());?//?操作系统版本
????????????????????}

????????????????}?else?{
????????????????????//?logger
????????????????????logger.debug("请求参数为空:"?+?logText);
????????????????????result.clear();?//?清空
????????????????}
????????????}?else?{
????????????????//?log记录一下
????????????????logger.debug("日志行内容格式不正确:"?+?logText);
????????????}
????????}?else?{
????????????logger.debug("日志行内容为空,无法进行解析:"?+?logText);
????????}
????????return?result;
????}

????/**
?????*?处理请求参数<br/>
?????*?处理结果保存到参数?result?集合(Map?集合)
?????*?
?????*?@param?clientInfo
?????*????????????保存最终用户行为数据的?map?集合
?????*?@param?requestBody
?????*????????????请求参数中,用户行为数据,格式为:
?????*????????????u_nu=1&u_sd=6D4F89C0-E17B-45D0-BFE0-059644C1878D&c_time=
?????*????????????1450569596991&ver=1&en=e_l&pl=website&sdk=js&b_rst=1440*900&
?????*????????????u_ud=4B16B8BB-D6AA-4118-87F8-C58680D22657&b_iev=Mozilla%2F5.0%
?????*????????????20(Windows%20NT%205.1)%20AppleWebKit%2F537.36%20(KHTML%2C%
?????*????????????20like%20Gecko)%20Chrome%2F45.0.2454.101%20Safari%2F537.36&l=
?????*????????????zh-CN&bf_sid=33cbf257-3b11-4abd-ac70-c5fc47afb797_11177014
?????*/
????private?static?void?handleRequestBody(Map<String,?String>?clientInfo,?String?requestBody)?{
????????//?将请求参数体按照?&?切割
????????String[]?parameters?=?requestBody.split("&");
????????for?(String?parameter?:?parameters)?{
????????????//?循环处理参数,parameter?格式为:?c_time=1450569596991??=?只会出现一次
????????????String[]?params?=?parameter.split("=");
????????????String?key,?value?=?null;
????????????try?{
????????????????//?使用?utf-8?解码
????????????????key?=?URLDecoder.decode(params[0].trim(),?"utf-8");
????????????????value?=?URLDecoder.decode(params[1].trim(),?"utf-8");
????????????????//?添加到结果集合??Map?中
????????????????clientInfo.put(key,?value);
????????????}?catch?(Exception?e)?{
????????????????logger.warn("解码失败:"?+?parameter,?e);
????????????}
????????}
????}
}
18.3.1、日志解析
18.3.2、IP地址解析/补全

使用淘宝接口解析IP地址

??官网:http://ip.taobao.com/

??示例:REST API:http://ip.taobao.com/service/getIpInfo.php?ip=123.125.71.38

??限制:10QPS(Query Per Second)

使用第三方 IP 库

??通过文件中已经存放的 IP 和地区的映射进行 IP 解析,由于更新不及时,可能会导致某些 IP 解析不正确(小概率事件)。(推荐使用:纯真IP地址数据库)

使用自己的 IP 库

??通过第三方的 IP 库,逐渐生成自己的 IP 库,自主管理。

IP 库表设计

??startip(起始ip)

??endip(结束ip)

??country(国家)

??province(省份)

??city(城市)

尖叫提示:判断某个 IP 是否在某个地域的起始 IP 和结束 IP 区间。

IP 与 long 的互转的工具类:

示例代码如下:

代码语言:javascript
复制
????//?将?127.0.0.1?形式的?IP?地址转换成十进制整数
????public?long?IpToLong(String?strIp){
????????long[]?ip?=?new?long[4];
????????int?position1?=?strIp.indexOf(".");
????????int?position2?=?strIp.indexOf(".",?position1?+?1);
????????int?position3?=?strIp.indexOf(".",?position2?+?1);
????????//?将每个.之间的字符串转换成整型??
????????ip[0]?=?Long.parseLong(strIp.substring(0,?position1));
????????ip[1]?=?Long.parseLong(strIp.substring(position1?+?1,?position2?-?position1?-?1));
????????ip[2]?=?Long.parseLong(strIp.substring(position2?+?1,?position3?-?position2?-?1));
????????ip[3]?=?Long.parseLong(strIp.substring(position3?+?1));
????????//?进行左移位处理
????????return?(ip[0]?<<?24)?+?(ip[1]?<<?16)?+?(ip[2]?<<?8)?+?ip[3];
????}

????//?将十进制整数形式转换成?127.0.0.1?形式的?ip?地址
????public?String?LongToIp(long?ip)?{
????????StringBuilder?sb?=?new?StringBuilder();
????????//?直接右移?24?位
????????sb.append(ip?>>?24);
????????sb.append(".");
????????//?将高?8?位置?0,然后右移?16
????????sb.append((ip?&?0x00FFFFFF)?>>?16);
????????sb.append(".");
????????//?将高?16?位置0?,然后右移?8?位
????????sb.append((ip?&?0x0000FFFF)?>>?8);
????????sb.append(".");
????????//?将高?24?位置?0
????????sb.append((ip?&?0x000000FF));
????????return?sb.toString();
????}
18.3.3、浏览器信息解析
18.3.4、ETL代码编写

新建类:

??AnalysisDataMapper.java

??AnalysisDataRunner.java

??目标:读取 HDFS 中的数据,清洗后写入到 HBase 中。

核心思路梳理:

  • Step1、创建 AnalysisDataMapper 类,复写 map 方法。
  • Step2、在 map 方法中通过 LoggerUtil.handleLogText 方法将当前行数据解析成 Map<String, String> 集合 clientInfo。
  • Step3、获取当前行日志信息的事件类型,并根据获取到的事件类型去枚举类型中匹配生成 EventEnum 对象,如果没有匹配到对应的事件类型,则返回 null。
  • Step4、判断如果无法处理给定的事件类型,则使用 log4j 输出。
  • Step5、如果可以处理指定事件类型,则开始处理事件,创建 handleEventData(Map<String, String> clientInfo, EventEnum event, Context context, Text value) 方法处理事件。
  • Step6、在 handleEventData 方法中,我们需要过滤掉那些数据不合法的 Event 事件,通过 filterEventData(Map<String, String> clientInfo, EventEnum event) 方法过滤。 过滤规则:如果是 java_server 过来的数据,则会员 id 必须存在,如果是 website 过来的数据,则会话 id 和用户 id 必须存在。
  • Step7、如果没有通过过滤,则通过日志输出当前数据,如果通过过滤,则开始准备输出数据,创建方法 outPutData(Map<String, String> clientInfo, Context context)
  • Step8、outputData 方法中,我们可以删除一些无用的数据,比如浏览器信息的原始数据(因为已经解析过了)。同时需要创建一个生成 rowKey 的方法 generateRowKey(String uuid, long serverTime, Map<String, String> clientInfo),通过该方法生成的 rowKey 之后,添加内容到 HBase 表中。
  • Step9、generateRowKey 方法主要用于 rowKey 的生成,通过拼接:时间+uuid的crc32编码+数据内容的hash码的crc32编码,作为 rowKey,一共 12 个字节。

示例代码如下:

AnalysisDataMapper.java

代码语言:javascript
复制
package?com.z.transformer.mr.etl;

import?java.io.IOException;
import?java.util.Map;
import?java.util.zip.CRC32;

import?org.apache.commons.lang.StringUtils;
import?org.apache.hadoop.hbase.client.Put;
import?org.apache.hadoop.hbase.util.Bytes;
import?org.apache.hadoop.io.NullWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.log4j.Logger;

import?com.z.transformer.common.EventLogConstants;
import?com.z.transformer.common.EventLogConstants.EventEnum;
import?com.z.transformer.util.LoggerUtil;
import?com.z.transformer.util.TimeUtil;

public?class?AnalysisDataMapper?extends?Mapper<Object,?Text,?NullWritable,?Put>?{
????//?Object?是偏移量,Text?表示输入,NullWritable,?Put?可以互换

????//?如果无法处理给定的事件类型,则使用?log4j?输出,?Logger?可以在运行?jar?包的控制台输出
????private?static?final?Logger?logger?=?Logger.getLogger(AnalysisDataMapper.class);

????private?CRC32?crc1?=?null;
????private?CRC32?crc2?=?null;
????private?byte[]?family?=?null;
????private?long?currentDayInMills?=?-1;

????/**
?????*?初始化数据
?????*/
????@Override
????protected?void?setup(Mapper<Object,?Text,?NullWritable,?Put>.Context?context)
????????????throws?IOException,?InterruptedException?{
????????crc1?=?new?CRC32();
????????crc2?=?new?CRC32();
????????this.family?=?EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;
????????currentDayInMills?=?TimeUtil.getTodayInMillis();
????}

????//?1、覆写?map?方法
????@Override
????protected?void?map(Object?key,?Text?value,?Context?context)?throws?IOException,?InterruptedException?{
????????//?2、将原始数据通过?LoggerUtil?解析成?Map?键值对
????????Map<String,?String>?clientInfo?=?LoggerUtil.handleLogText(value.toString());

????????//?2.1、如果解析失败,则?Map?集合中无数据,通过日志输出当前数据
????????if?(clientInfo.isEmpty())?{
????????????logger.debug("日志解析失败:"?+?value.toString());
????????????return;
????????}

????????//?3、根据解析后的数据,生成对应的?Event?事件类型(通过枚举类型的别名来解析)
????????EventEnum?event?=?EventEnum.valueOfAlias(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
????????if?(event?==?null)?{
????????????//?4、无法处理的事件,直接输出事件类型
????????????logger.debug("无法匹配对应的事件类型:"?+?clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME));
????????}?else?{
????????????//?5、处理具体的事件
????????????handleEventData(clientInfo,?event,?context,?value);
????????????//?clientInfo?数据集,?event?事件类型,?context?上下文(通过上下文写入到HBase),?value?当前行的数据(可能会有新的过滤操作)
????????}
????}

????/**
?????*?处理具体的事件的方法
?????*?
?????*?@param?clientInfo
?????*?@param?event
?????*?@param?context
?????*?@param?value
?????*?@throws?InterruptedException
?????*?@throws?IOException
?????*/
????public?void?handleEventData(Map<String,?String>?clientInfo,?EventEnum?event,?Context?context,?Text?value)
????????????throws?IOException,?InterruptedException?{
????????//?6、如果事件成功通过过滤,则准备处理具体事件
????????if?(filterEventData(clientInfo,?event))?{
????????????outPutData(clientInfo,?context);
????????}?else?{
????????????//?如果事件没有通过过滤,则通过日志输出当前数据
????????????logger.debug("事件格式不正确:"?+?value.toString());
????????}
????}

????/**
?????*?6、如果事件成功通过过滤,则准备处理具体事件(我们的?HBase?只存成功通过过滤的事件)
?????*?
?????*?@param?clientInfo
?????*?@param?event
?????*?@return
?????*/
????public?boolean?filterEventData(Map<String,?String>?clientInfo,?EventEnum?event)?{
????????//?事件数据全局过滤(具体全局过滤条件视情况而定,这里的?“服务器时间”?和?“平台”?是例子)
????????boolean?result?=?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME))
????????????????&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM));
????????//?后面几乎全部是&&操作,只要有一个?false,那么该?Event?事件就无法处理

????????//?public?static?final?String?PC_WEBSITE_SDK?=?"website";
????????//?public?static?final?String?JAVA_SERVER_SDK?=?"java_server";

????????//?先确定平台
????????switch?(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_PLATFORM))?{
????????//?Java?Server?平台发来的数据
????????case?EventLogConstants.PlatformNameConstants.JAVA_SERVER_SDK:
????????????result?=?result?&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_MEMBER_ID));?//?先判断会员?ID?是否存在
????????????//?再确定事件
????????????switch?(event)?{
????????????case?CHARGEREFUND:
????????????????//?退款事件
????????????????//?......
????????????????break;
????????????case?CHARGESUCCESS:
????????????????//?订单支付成功事件
????????????????result?=?result?&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID));
????????????????break;
????????????default:
????????????????logger.debug("无法处理指定事件:"?+?clientInfo);
????????????????result?=?false;
????????????????break;
????????????}
????????????break;

????????//?WebSite?平台发来的数据
????????case?EventLogConstants.PlatformNameConstants.PC_WEBSITE_SDK:
????????????//?再确定事件
????????????switch?(event)?{
????????????case?CHARGEREQUEST:
????????????????//?下单事件
????????????????result?=?result?&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_ID))
????????????????????????&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_TYPE))
????????????????????????&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_PAYMENT_TYPE))
????????????????????????&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_ORDER_CURRENCY_AMOUNT));
????????????????break;
????????????case?EVENT:
????????????????//?Event?事件
????????????????result?=?result?&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_CATEGORY))
????????????????????????&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_EVENT_ACTION));
????????????????break;
????????????case?LAUNCH:
????????????????//?Launch?访问事件
????????????????//?......
????????????????break;
????????????case?PAGEVIEW:
????????????????//?PV?事件
????????????????result?=?result?&&?StringUtils.isNotBlank(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_CURRENT_URL));
????????????????break;
????????????default:
????????????????logger.debug("无法处理指定事件:"?+?clientInfo);
????????????????result?=?false;
????????????????break;
????????????}
????????????break;

????????default:
????????????result?=?false;
????????????logger.debug("无法确定的数据来源:"?+?clientInfo);
????????????break;
????????}

????????return?result;
????}

????/**
?????*?7?和?8、如果事件成功通过过滤,则输出事件到?HBase?的方法
?????*?
?????*?@param?clientInfo
?????*?@param?context
?????*?@throws?IOException
?????*?@throws?InterruptedException
?????*/
????public?void?outPutData(Map<String,?String>?clientInfo,?Context?context)?throws?IOException,?InterruptedException?{
????????String?uuid?=?clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_UUID);
????????long?serverTime?=?Long.valueOf(clientInfo.get(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME));

????????//?因为浏览器信息已经解析完成,所以此时删除原始的浏览器信息
????????clientInfo.remove(EventLogConstants.LOG_COLUMN_NAME_USER_AGENT);

????????//?创建?rowKey
????????byte[]?rowkey?=?generateRowKey(uuid,?serverTime,?clientInfo);
????????Put?put?=?new?Put(rowkey);

????????for?(Map.Entry<String,?String>?entry?:?clientInfo.entrySet())?{
????????????if?(StringUtils.isNotBlank(entry.getKey())?&&?StringUtils.isNotBlank(entry.getValue()))?{
????????????????put.addColumn(family,?Bytes.toBytes(entry.getKey()),?Bytes.toBytes(entry.getValue()));
????????????}
????????}

????????context.write(NullWritable.get(),?put);
????}

????/**
?????*?9、为向?HBase?中写入数据依赖?Put?对象,Put?对象的创建依赖?RowKey,所以如下方法
?????*?
?????*?rowKey=时间+uuid的crc32编码+数据内容的hash码的crc32编码
?????*?
?????*?@return
?????*/
????public?byte[]?generateRowKey(String?uuid,?long?serverTime,?Map<String,?String>?clientInfo)?{
????????//?先清空?crc1?和??crc2?集合中的数据内容
????????crc1.reset();
????????crc2.reset();

????????//?时间=当前数据访问服务器的时间-当天00:00点的时间戳?,得到最大值是8位数字=3600*24*1000=86400000?,可以用int来存储,大小是?4个字节
????????byte[]?timeBytes?=?Bytes.toBytes(serverTime?-?this.currentDayInMills);

????????//?uuid?的?crc?编码
????????if?(StringUtils.isNotBlank(uuid))?{
????????????this.crc1.update(Bytes.toBytes(uuid));
????????}
????????byte[]?uuidBytes?=?Bytes.toBytes(this.crc1.getValue());

????????//?数据内容的?hash?码的?crc?编码
????????this.crc2.update(Bytes.toBytes(clientInfo.hashCode()));
????????byte[]?clientInfoBytes?=?Bytes.toBytes(this.crc2.getValue());

????????//?综合字节数组
????????byte[]?buffer?=?new?byte[timeBytes.length?+?uuidBytes.length?+?clientInfoBytes.length];
????????//?数组合并
????????System.arraycopy(timeBytes,?0,?buffer,?0,?timeBytes.length);
????????System.arraycopy(uuidBytes,?0,?buffer,?timeBytes.length,?uuidBytes.length);
????????System.arraycopy(clientInfoBytes,?0,?buffer,?uuidBytes.length,?clientInfoBytes.length);

????????return?buffer;
????}
}

AnalysisDataRunner.java

代码语言:javascript
复制
package?com.z.transformer.mr.etl;

import?java.io.File;
import?java.io.IOException;

import?org.apache.commons.lang.StringUtils;
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.FileSystem;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.hbase.HBaseConfiguration;
import?org.apache.hadoop.hbase.HColumnDescriptor;
import?org.apache.hadoop.hbase.HTableDescriptor;
import?org.apache.hadoop.hbase.TableName;
import?org.apache.hadoop.hbase.client.Admin;
import?org.apache.hadoop.hbase.client.Connection;
import?org.apache.hadoop.hbase.client.ConnectionFactory;
import?org.apache.hadoop.hbase.client.Put;
import?org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import?org.apache.hadoop.io.NullWritable;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.util.Tool;
import?org.apache.hadoop.util.ToolRunner;

import?com.z.transformer.common.EventLogConstants;
import?com.z.transformer.common.GlobalConstants;
import?com.z.transformer.util.TimeUtil;

public?class?AnalysisDataRunner?implements?Tool?{
????private?Configuration?conf?=?null;

????public?static?void?main(String[]?args)?{
????????try?{
????????????int?resultCode?=?ToolRunner.run(new?AnalysisDataRunner(),?args);
????????????if?(resultCode?==?0)?{
????????????????System.out.println("Success!");
????????????}?else?{
????????????????System.out.println("Fail!");
????????????}
????????????System.exit(resultCode);
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????????System.exit(1);
????????}
????}

????@Override
????public?void?setConf(Configuration?conf)?{
????????//?先实例化?Configuration
????????this.conf?=?HBaseConfiguration.create(conf);
????}

????@Override
????public?Configuration?getConf()?{
????????//?全局的访问方法
????????return?this.conf;
????}

????@Override
????public?int?run(String[]?args)?throws?Exception?{
????????Configuration?conf?=?this.getConf();
????????//?处理传入的时间参数,默认或不合法时间则直接使用昨天日期
????????this.processArgs(conf,?args);

????????//?开始创建?Job
????????Job?job?=?Job.getInstance(conf,?"Event-ETL");

????????//?设置?Job?参数
????????job.setJarByClass(AnalysisDataRunner.class);

????????//?Mapper?参数设置
????????job.setMapperClass(AnalysisDataMapper.class);
????????job.setMapOutputKeyClass(NullWritable.class);
????????job.setMapOutputValueClass(Put.class);

????????//?Reducer?参数设置
????????job.setNumReduceTasks(0);

????????//?设置数据输入
????????initJobInputPath(job);

????????//?设置输出到?HBase?的信息
????????initHBaseOutPutConfig(job);
????????//?job.setJar("target/transformer-0.0.1-SNAPSHOT.jar");

????????//?Job?提交
????????return?job.waitForCompletion(true)???0?:?1;
????}

????/**
?????*?初始化?Job?数据输入目录
?????*?
?????*?@param?job
?????*?@throws?IOException
?????*/
????private?void?initJobInputPath(Job?job)?throws?IOException?{
????????Configuration?conf?=?job.getConfiguration();
????????//?获取要执行ETL操作的那一天的数据
????????String?date?=?conf.get(GlobalConstants.RUNNING_DATE_PARAMES);?//?2017-08-14
????????//?格式化?HDFS?文件路径
????????String?hdfsPath?=?TimeUtil.parseLong2String(TimeUtil.parseString2Long(date),?"yyyy/MM/dd");//?2017/08/14

????????if?(GlobalConstants.HDFS_LOGS_PATH_PREFIX.endsWith("/"))?{
????????????hdfsPath?=?GlobalConstants.HDFS_LOGS_PATH_PREFIX?+?hdfsPath;?//?/event-logs/2017/08/14
????????}?else?{
????????????hdfsPath?=?GlobalConstants.HDFS_LOGS_PATH_PREFIX?+?File.separator?+?hdfsPath;?//?/event-logs/2017/08/14
????????????//?File.separator?的作用是:根据当前操作系统获取对应的文件分隔符,windows中是?\?,Linux中是?/
????????}

????????FileSystem?fs?=?FileSystem.get(conf);
????????Path?inPath?=?new?Path(hdfsPath);

????????if?(fs.exists(inPath))?{
????????????FileInputFormat.addInputPath(job,?inPath);
????????}?else?{
????????????throw?new?RuntimeException("HDFS?中该文件目录不存在:"?+?hdfsPath);
????????}
????}

????/**
?????*?设置输出到?HBase?的一些操作选项
?????*?
?????*?@throws?IOException
?????*/
????private?void?initHBaseOutPutConfig(Job?job)?throws?IOException?{
????????Configuration?conf?=?job.getConfiguration();
????????//?获取要执行ETL操作的那一天的数据
????????String?date?=?conf.get(GlobalConstants.RUNNING_DATE_PARAMES);?//?2017-08-14
????????//?格式化?HBase?表的后缀名
????????String?tableNameSuffix?=?TimeUtil.parseLong2String(TimeUtil.parseString2Long(date),?TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT);?//?20170814
????????//?构建表名
????????String?tableName?=?EventLogConstants.HBASE_NAME_EVENT_LOGS?+?tableNameSuffix;?//?event_logs20170814

????????//?指定输出(初始化?ReducerJob)
????????TableMapReduceUtil.initTableReducerJob(tableName,?null,?job);

????????Connection?conn?=?null;
????????Admin?admin?=?null;

????????//?使用?HBase?的新?API
????????conn?=?ConnectionFactory.createConnection(conf);
????????admin?=?conn.getAdmin();

????????//?创建表描述器(即通过表名实例化表描述器)
????????TableName?tn?=?TableName.valueOf(tableName);
????????HTableDescriptor?htd?=?new?HTableDescriptor(tn);

????????//?设置列族
????????htd.addFamily(new?HColumnDescriptor(EventLogConstants.EVENT_LOGS_FAMILY_NAME));
????????//?判断表是否存在
????????if?(admin.tableExists(tn))?{
????????????//?存在,则删除
????????????if?(admin.isTableEnabled(tn))?{
????????????????//?先将表设置为不可用
????????????????admin.disableTable(tn);
????????????}
????????????//?再删除表
????????????admin.deleteTable(tn);
????????}

????????//?创建表,在创建的过程中可以考虑预分区操作
????????//?假设预分区为?3个分区
????????//?byte[][]?keySplits?=?new?byte[3][];
????????//?keySplits[0]?=?Bytes.toBytes("1");?//?(-∞,?1]
????????//?keySplits[1]?=?Bytes.toBytes("2");?//?(1,?2]
????????//?keySplits[2]?=?Bytes.toBytes("3");?//?(2,?∞]
????????//?admin.createTable(htd,?keySplits);

????????admin.createTable(htd);
????????admin.close();
????}

????/**
?????*?处理时间参数,如果没有传递参数的话,则默认清洗前一天的。
?????*?
?????*?Job脚本如下:?bin/yarn?jar?ETL.jar?com.z.transformer.mr.etl.AnalysisDataRunner?-date?2017-08-14
?????*?
?????*?@param?args
?????*/
????private?void?processArgs(Configuration?conf,?String[]?args)?{
????????String?date?=?null;
????????for?(int?i?=?0;?i?<?args.length;?i++)?{
????????????if?("-date".equals(args[i]))?{?//?找到?"-date"?标记
????????????????date?=?args[i?+?1];?//?获取时间
????????????????break;
????????????}
????????}

????????if?(StringUtils.isBlank(date)?||?!TimeUtil.isValidateRunningDate(date))?{
????????????//?如果没有传递参数,默认清洗昨天的数据然后存储到?HBase?中
????????????date?=?TimeUtil.getYesterday();
????????}
????????//?将要清洗的目标时间字符串保存到?conf?对象中(这样全局中就可以引用)
????????conf.set(GlobalConstants.RUNNING_DATE_PARAMES,?date);
????}
}

18.4、测试

18.4.1、上传测试数据
代码语言:javascript
复制
$?/opt/module/hadoop-2.7.2/bin/hdfs?dfs?-mkdir?-p?/event-logs/2015/12/20
$?/opt/module/hadoop-2.7.2/bin/hdfs?dfs?-put?/opt/software/20151220.log?/event-logs/2015/12/20
18.4.2、打包集群运行

方案一:

??修改 etc/hadoop/hadoop-env.sh 中的 HADOOP_CLASSPATH 配置信息。

例如:

代码语言:javascript
复制
????export?HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

方案二:

??使用 maven 插件:maven-shade-plugin,将第三方依赖的 jar 全部打包进去,需要在 pom.xml 中配置依赖。参考【章节 十七、工具代码导入】中的 pom.xml 文件。

参数设置:

代码语言:javascript
复制
????1、-P?local?clean?package(不打包第三方jar)
????2、-P?dev?clean?package?install(打包第三方jar)(推荐使用这种)

打包成功后,将打好的 jar 包上传至 Linux 上,然后执行命令,如下:

代码语言:javascript
复制
/opt/module/hadoop-2.7.2/bin/yarn?jar?/opt/software/transformer-0.0.1-SNAPSHOT.jar?com.z.transformer.mr.etl.AnalysisDataRunner?-date?2015-12-20

测试成功!截图如下:

1、控制台

2、HBase 网页端:http://hadoop102:16010/master-status

3、历史服务器:http://hadoop102:19888/jobhistory/attempts/job_1555404378493_0005/m/SUCCESSFUL

尖叫提示:如果在打包的过程中 org.apache.maven.plugins 其中没有包含所依赖的 jar 包,则需要在 HADOOP_CLASSPATH 添加所依赖的 jar 文件。

例如:编写代码依赖了 HBase,但是打包 MR 任务的时候,没有 include HBase 的相关 jar,则需要在命令行中执行如下命令:

代码语言:javascript
复制
export?HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

在执行代码之前,我们先手动删除 hbase 上的表和命名空间,命令如下:

代码语言:javascript
复制
hbase(main):002:0>?disable?'event_logs20151220'
hbase(main):003:0>?drop?'event_logs20151220'
hbase(main):005:0>?drop_namespace?'ns_ct'

问题:当我们查看历史服务器中的 Logs 日志时,发现一个解码失败异常:java.lang.IllegalArgumentException: URLDecoder: Incomplete trailing escape (%) pattern,如下图所示:

解决问题链接:/developer/article/1417287

十九、创建数据库表

19.1、使用 Navicat 工具

前提:需要在 Linux 中对 Mysql 的访问授权。

代码语言:javascript
复制
grant?all?on?*.*?to?root@'%'?identified?by?'123456';
flush?privileges;
exit;

19.2、通过 SQL 文件构建表

??参考链接:/developer/article/1380586

本文参与?腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-04-16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 十六、数据处理
    • 16.1、ETL 操作
      • 16.2、HBase 设计
        • 16.2.1、每天1张表
        • 16.2.2、倒序或在前缀上加数字
        • 16.2.3、预分区
      • 16.3、MapReduce 分析过程
        • 16.4、Hive 分析过程
          • 16.5、Mysql 表结构设计
            • 16.5.1、常用关系型数据库表模型
            • 16.5.2、表结构
        • 十七、工具代码导入
        • 十八、业务 ETL 实现
          • 18.1、功能
            • 18.2、数据
              • 18.2.1、上传方式
              • 18.2.2、流程
              • 18.2.3、细节分析
            • 18.3、代码实现
              • 18.3.1、日志解析
              • 18.3.2、IP地址解析/补全
              • 18.3.3、浏览器信息解析
              • 18.3.4、ETL代码编写
            • 18.4、测试
              • 18.4.1、上传测试数据
              • 18.4.2、打包集群运行
          • 十九、创建数据库表
            • 19.1、使用 Navicat 工具
              • 19.2、通过 SQL 文件构建表
              相关产品与服务
              云数据库 SQL Server
              腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
              http://www.vxiaotou.com