OSS是Hadoop/Spark批处理作业可选的数据存储之一。本文将演示在OSS中创建一个文件,并在Spark中进行访问。

OSS数据准备

1. 创建bucket。

2. 上传文件

上传文件可以通过OSS SDK,也可以通过 HDP2.6 Hadoop 读取和写入 OSS 数据,我们直接以控制台为例:

记住这个bucket的地址:oss://liumi-hust/A-Game-of-Thrones.txtendpoint:oss-cn-hangzhou-internal.aliyuncs.com至此OSS数据准备部分就已经ready。

在spark应用中读取OSS的数据

1. 开发应用

应用开发上跟传统的部署方式没有区别。

SparkConf conf = new SparkConf().setAppName(WordCount.class.getSimpleName());

JavaRDD<String> lines = sc.textFile("oss://liumi-hust/A-Game-of-Thrones.txt", 250);

...
wordsCountResult.saveAsTextFile("oss://liumi-hust/A-Game-of-Thrones-result");

sc.close();   

2. 有两种常见的配置方式:1)静态配置文件。2)提交应用的时候动态设置

1)将前面的core-site.xml放入应用项目的resources目录

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
    <!-- OSS配置 -->
    <property>
        <name>fs.oss.impl</name>
        <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
    </property>
    <property>
        <name>fs.oss.endpoint</name>
        <value>oss-cn-hangzhou-internal.aliyuncs.com</value>
    </property>
    <property>
        <name>fs.oss.accessKeyId</name>
        <value>{临时AK_ID}</value>
    </property>
    <property>
        <name>fs.oss.accessKeySecret</name>
        <value>{临时AK_SECRET}</value>
    </property>
    <property>
        <name>fs.oss.buffer.dir</name>
        <value>/tmp/oss</value>
    </property>
    <property>
        <name>fs.oss.connection.secure.enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>fs.oss.connection.maximum</name>
        <value>2048</value>
    </property>

</configuration>

2)以spark为例,也可以在提交应用时设置

hadoopConf:
    # OSS
    "fs.oss.impl": "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"
    "fs.oss.endpoint": "oss-cn-hangzhou-internal.aliyuncs.com"
    "fs.oss.accessKeyId": "your ak-id"
    "fs.oss.accessKeySecret": "your ak-secret"

3. 打包的jar文件需要包含所有依赖

mvn assembly:assembly

附应用的pom.xml:

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0"
 3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5    <modelVersion>4.0.0</modelVersion>
 6
 7    <groupId>com.aliyun.liumi.spark</groupId>
 8    <artifactId>SparkExampleJava</artifactId>
 9    <version>1.0-SNAPSHOT</version>
10
11    <dependencies>
12        <dependency>
13            <groupId>org.apache.spark</groupId>
14            <artifactId>spark-core_2.12</artifactId>
15            <version>2.4.3</version>
16        </dependency>
17
18        <dependency>
19            <groupId>com.aliyun.dfs</groupId>
20            <artifactId>aliyun-sdk-dfs</artifactId>
21            <version>1.0.3</version>
22        </dependency>
23
24    </dependencies>
25
26    <build>
27    <plugins>
28        <plugin>
29            <groupId>org.apache.maven.plugins</groupId>
30            <artifactId>maven-assembly-plugin</artifactId>
31            <version>2.6</version>
32            <configuration>
33                <appendAssemblyId>false</appendAssemblyId>
34                <descriptorRefs>
35                    <descriptorRef>jar-with-dependencies</descriptorRef>
36                </descriptorRefs>
37                <archive>
38                    <manifest>
39                        <mainClass>com.aliyun.liumi.spark.example.WordCount</mainClass>
40                    </manifest>
41                </archive>
42            </configuration>
43            <executions>
44                <execution>
45                    <id>make-assembly</id>
46                    <phase>package</phase>
47                    <goals>
48                        <goal>assembly</goal>
49                    </goals>
50                </execution>
51            </executions>
52        </plugin>
53    </plugins>
54    </build>
55</project>

4. 编写Dockerfile

OSS:

# spark base image
FROM registry.cn-beijing.aliyuncs.com/eci_open/spark:2.4.4
RUN rm $SPARK_HOME/jars/kubernetes-client-*.jar
ADD https://repo1.maven.org/maven2/io/fabric8/kubernetes-client/4.4.2/kubernetes-client-4.4.2.jar $SPARK_HOME/jars
RUN mkdir -p /opt/spark/jars
COPY SparkExampleJava-1.0-SNAPSHOT.jar /opt/spark/jars
# oss 依赖jar
COPY aliyun-sdk-oss-3.4.1.jar /opt/spark/jars
COPY hadoop-aliyun-2.7.3.2.6.1.0-129.jar /opt/spark/jars
COPY jdom-1.1.jar /opt/spark/jars

oss 依赖jar下载地址请参见通过 HDP2.6 Hadoop 读取和写入 OSS 数据

5. 构建应用镜像

docker build -t registry.cn-beijing.aliyuncs.com/liumi/spark:2.4.4-example -f Dockerfile .

6. 推到阿里云ACR

docker push registry.cn-beijing.aliyuncs.com/liumi/spark:2.4.4-example

至此,spark应用镜像都已经准备完毕。接下来就是在kubernetes集群中部署Spark应用了。