Spark 学习笔记可以follow这里:https://github.com/MachineLP/Spark-
下面来看几个问题,下面将关注几个问题进行阐述:
1、Mac下安装pyspark
可以参考:Big Data Analytics using Spark这个课程:https://courses.edx.org/courses/course-v1:UCSanDiegoX+DSE230x+1T2018/courseware/b341cd4498054fa089cc99dcadd5875a/13b26725c3564f73ba763ef209b8449e/1?activate_block_id=block-v1%3AUCSanDiegoX%2BDSE230x%2B1T2018%2Btype%40vertical%2Bblock%40ff752a67a23547db9efbc7769dc93987
若查看所有版的JAVA_HOME,使用命令:/usr/libexec/java_home -v
下载完以后,可以不用配置通过下面方法进行使用:
import os
import sys
#下面这些目录都是你自己机器的Spark安装目录和Java安装目录
os.environ['SPARK_HOME'] = "/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/"
sys.path.append("/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/bin")
sys.path.append("/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/python")
sys.path.append("/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/python/pyspark")
sys.path.append("/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/python/lib")
sys.path.append("/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip")
sys.path.append("/Users/liupeng/spark/spark-2.4.0-bin-hadoop2.7/lib/py4j-0.9-src.zip")
# sys.path.append("/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home")
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home"
from pyspark import SparkContext
from pyspark import SparkConf
sc = SparkContext("local","testing")
print (sc.version)
x = sc.parallelize([1,2,3,4])
y = x.map(lambda x:(x, x**3))
print ( y.collect() )
2、spark相关基础知识
相关spark基础知识如下:
Spark Context:
We start by creating a SparkContext object named sc. In this case we create a spark context that uses 4 executors (one per core)。
Only one sparkContext at a time!
# sc.stop() #commented out so that you don't stop your context by mistake
RDDs
RDD (or Resilient Distributed DataSet) is the main novel data structure in Spark. You can think of it as a list whose elements are stored on several computers.
The elements of each RDD are distributed across the worker nodes which are the nodes that perform the actual computations. This notebook, however, is running on the Driver node. As the RDD is not stored on the driver-node you cannot access it directly. The variable name RDD is really just a pointer to a python object which holds the information regardnig the actual location of the elements.
Some basic RDD commands
Parallelize
A=sc.parallelize(range(3))
print (A)
output:PythonRDD[1] at RDD at PythonRDD.scala:48
Collect:
L=A.collec t()
print (type(L))
print (L)
output:<class 'list'> [0, 1, 2]
Using .collect() eliminates the benefits of parallelism
It is often tempting to .collect() and RDD, make it into a list, and then process the list using standard python. However, note that this means that you are using only the head node to perform the computation which means that you are not getting any benefit from spark.
Using RDD operations, as described below, will make use of all of the computers at your disposal.
Map
A.map(lambda x: x*x).collect()
output:[0, 1, 4]
Note: Here we are using lambda functions, later we will see that regular functions can also be used.
Reduce
The simplest example of a 2-to-1 operation is the sum:
A.reduce(lambda x,y:x+y)
output:3
Here is an example of a reduce operation that finds the shortest string in an RDD of strings.
words=['this','is','the','best','mac','ever']
wordRDD=sc.parallelize(words)
wordRDD.reduce(lambda w,v: w if len(w)<len(v) else v)
output:'s'
Properties of reduce operations:
1 + 3 + 5 + 2 5 + 3 + 1 + 2
1 - 3 - 5 - 2 1 - 3 - 5 - 2
Why must reordering not change the result?
You can think about the reduce operation as a binary tree where the leaves are the elements of the list and the root is the final result. Each triplet of the form (parent, child1, child2) corresponds to a single application of the reduce function.
The order in which the reduce operation is applied is determined at run time and depends on how the RDD is partitioned across the cluster. There are many different orders to apply the reduce operation.
If we want the input RDD to uniquely determine the reduced value all evaluation orders must must yield the same final result. In addition, the order of the elements in the list must not change the result. In particular, reversing the order of the operands in a reduce function must not change the outcome.
For example the arithmetic operations multiply * and add + can be used in a reduce, but the operations subtract - and divide / should not.
Doing so will not raise an error, but the result is unpredictable.
B=sc.parallelize([1,3,5,2])
B.reduce(lambda x,y: x-y)
output:-9
Slide Type
Which of these the following orders was executed?
or
Using regular functions instead of lambda functions
A.reduce(lambda x,y: x+y)
output:3
Suppose we want to find the
We could achieve that as follows:
def largerThan(x,y):
if len(x)>len(y): return x
elif len(y)>len(x): return y
else: #lengths are equal, compare lexicographically
if x>y:
return x
else:
return y
wordRDD.reduce(largerThan)
output:'this'
Summary:
We saw how to:
The effect of changing the number of workers
from time import time
from pyspark import SparkContext
for j in range(1,10):
sc = SparkContext(master="local[%d]"%(j))
t0=time()
for i in range(10):
sc.parallelize([1,2]*1000000).reduce(lambda x,y:x+y)
print("%2d executors, time=%4.3f"%(j,time()-t0))
sc.stop()