SparkOnYarn实战

这里已经部署好hadoop环境,以及spark

专注于为中小企业提供网站建设、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业中卫免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了上1000+企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

环境如下:

192.168.1.2  master

[hadoop@master ~]$ jps
2298 SecondaryNameNode
2131 NameNode
2593 JobHistoryServer
4363 Jps
3550 HistoryServer
2481 ResourceManager
3362 Master

192.168.1.3  slave1

[hadoop@slave1 ~]$ jps
2919 Jps
2464 Worker
1993 DataNode
2109 NodeManager

192.168.1.4 slave2

[hadoop@slave2 ~]$ jps
2762 Jps
2113 NodeManager
1998 DataNode
2452 Worker

这里以spark自带求pi值的python程序为例

[hadoop@master ~]$ cd spark
[hadoop@master spark]$ find . -name "pi.py"
[hadoop@master spark]$ cat ./examples/src/main/python/pi.py
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys
from random import random
from operator import add

from pyspark import SparkContext


if __name__ == "__main__":
    """
        Usage: pi [slices]
    """
    sc = SparkContext(appName="PythonPi")
    slices = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * slices

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
    print "Pi is roughly %f" % (4.0 * count / n)
    


[hadoop@master spark]$ cd ./examples/src/main/python/ 
# 修改pi.py文件,在末尾添加
sc.stop()

[hadoop@master python]$ spark-submit --master spark://master:7077 --executor-memory 200m --driver-memory 200m pi.py 

# 如报下面错误,绑定hosts文件127.0.0.1为localhost
Traceback (most recent call last):
  File "/home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py", line 29, in 
    sc = SparkContext(appName="PythonPi")
  File "/home/hadoop/spark/python/pyspark/context.py", line 138, in __init__
    self._accumulatorServer = accumulators._start_update_server()
  File "/home/hadoop/spark/python/pyspark/accumulators.py", line 224, in _start_update_server
    server = SocketServer.TCPServer(("localhost", 0), _UpdateRequestHandler)
  File "/usr/lib64/python2.6/SocketServer.py", line 402, in __init__
    self.server_bind()
  File "/usr/lib64/python2.6/SocketServer.py", line 413, in server_bind
    self.socket.bind(self.server_address)
  File "", line 1, in bind
socket.gaierror: [Errno -3] Temporary failure in name resolution

# 正常执行如下
[hadoop@master python]$ spark-submit --master spark://master:7077 --executor-memory 200m --driver-memory 200m pi.py
Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/03/25 12:18:27 INFO spark.SecurityManager: Changing view acls to: hadoop
15/03/25 12:18:27 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop)
15/03/25 12:18:28 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/03/25 12:18:28 INFO Remoting: Starting remoting
15/03/25 12:18:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@master:47877]
15/03/25 12:18:29 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@master:47877]
15/03/25 12:18:29 INFO spark.SparkEnv: Registering MapOutputTracker
15/03/25 12:18:29 INFO spark.SparkEnv: Registering BlockManagerMaster
15/03/25 12:18:29 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150325121829-88cd
15/03/25 12:18:29 INFO storage.MemoryStore: MemoryStore started with capacity 116.0 MB.
15/03/25 12:18:30 INFO network.ConnectionManager: Bound socket to port 48556 with id = ConnectionManagerId(master,48556)
15/03/25 12:18:30 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/03/25 12:18:30 INFO storage.BlockManagerInfo: Registering block manager master:48556 with 116.0 MB RAM
15/03/25 12:18:30 INFO storage.BlockManagerMaster: Registered BlockManager
15/03/25 12:18:30 INFO spark.HttpServer: Starting HTTP Server
15/03/25 12:18:30 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/25 12:18:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:48872
15/03/25 12:18:30 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.1.2:48872
15/03/25 12:18:30 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-e2d76bbd-d2f6-4b2f-a018-f2d795a488aa
15/03/25 12:18:30 INFO spark.HttpServer: Starting HTTP Server
15/03/25 12:18:30 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/25 12:18:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43148
15/03/25 12:18:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/25 12:18:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/03/25 12:18:31 INFO ui.SparkUI: Started SparkUI at http://master:4040
15/03/25 12:18:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/25 12:18:35 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/spark/log/pythonpi-1427311113352
15/03/25 12:18:35 INFO util.Utils: Copying /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py to /tmp/spark-b66e65a9-91dc-479c-8938-14314fd1febb/pi.py
15/03/25 12:18:36 INFO spark.SparkContext: Added file file:/home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py at http://192.168.1.2:43148/files/pi.py with timestamp 1427311115
93515/03/25 12:18:36 INFO client.AppClient$ClientActor: Connecting to master spark://master:7077...
15/03/25 12:18:38 INFO spark.SparkContext: Starting job: reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38
15/03/25 12:18:38 INFO scheduler.DAGScheduler: Got job 0 (reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) with 2 output partitions (allowLocal=false)
15/03/25 12:18:38 INFO scheduler.DAGScheduler: Final stage: Stage 0(reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38)
15/03/25 12:18:38 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/03/25 12:18:38 INFO scheduler.DAGScheduler: Missing parents: List()
15/03/25 12:18:38 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37), which has no missing parents
15/03/25 12:18:38 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[1] at RDD at PythonRDD.scala:37)
15/03/25 12:18:38 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150325121838-0001
15/03/25 12:18:38 INFO client.AppClient$ClientActor: Executor added: app-20150325121838-0001/0 on worker-20150325114825-slave1-50832 (slave1:50832) with 1 cores
15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150325121838-0001/0 on hostPort slave1:50832 with 1 cores, 200.0 MB RAM
15/03/25 12:18:38 INFO client.AppClient$ClientActor: Executor added: app-20150325121838-0001/1 on worker-20150325114823-slave2-56888 (slave2:56888) with 1 cores
15/03/25 12:18:38 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150325121838-0001/1 on hostPort slave2:56888 with 1 cores, 200.0 MB RAM
15/03/25 12:18:39 INFO client.AppClient$ClientActor: Executor updated: app-20150325121838-0001/0 is now RUNNING
15/03/25 12:18:39 INFO client.AppClient$ClientActor: Executor updated: app-20150325121838-0001/1 is now RUNNING
15/03/25 12:18:43 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@slave1:35398/user/Executor#765391125] with ID 0
15/03/25 12:18:43 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: slave1 (PROCESS_LOCAL)
15/03/25 12:18:43 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 374986 bytes in 12 ms
15/03/25 12:18:44 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@slave2:37669/user/Executor#2076348799] with ID 1
15/03/25 12:18:44 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 1: slave2 (PROCESS_LOCAL)
15/03/25 12:18:44 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 502789 bytes in 4 ms
15/03/25 12:18:44 INFO storage.BlockManagerInfo: Registering block manager slave1:47192 with 116.0 MB RAM
15/03/25 12:18:44 INFO storage.BlockManagerInfo: Registering block manager slave2:42313 with 116.0 MB RAM
15/03/25 12:18:46 INFO scheduler.TaskSetManager: Finished TID 0 in 2534 ms on slave1 (progress: 1/2)
15/03/25 12:18:46 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
15/03/25 12:18:46 INFO scheduler.TaskSetManager: Finished TID 1 in 2234 ms on slave2 (progress: 2/2)
15/03/25 12:18:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/03/25 12:18:46 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
15/03/25 12:18:46 INFO scheduler.DAGScheduler: Stage 0 (reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38) finished in 7.867 s
15/03/25 12:18:46 INFO spark.SparkContext: Job finished: reduce at /home/hadoop/spark-1.0.2-bin-hadoop2/examples/src/main/python/pi.py:38, took 8.181053565 s
Pi is roughly 3.147220
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/environment,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/rdd,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/storage,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/pool,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/json,null}
15/03/25 12:18:46 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages,null}
15/03/25 12:18:46 INFO ui.SparkUI: Stopped Spark web UI at http://master:4040
15/03/25 12:18:46 INFO scheduler.DAGScheduler: Stopping DAGScheduler
15/03/25 12:18:46 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
15/03/25 12:18:46 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
15/03/25 12:18:47 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/03/25 12:18:47 INFO network.ConnectionManager: Selector thread was interrupted!
15/03/25 12:18:47 INFO network.ConnectionManager: ConnectionManager stopped
15/03/25 12:18:47 INFO storage.MemoryStore: MemoryStore cleared
15/03/25 12:18:47 INFO storage.BlockManager: BlockManager stopped
15/03/25 12:18:47 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster
15/03/25 12:18:47 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/03/25 12:18:47 INFO Remoting: Remoting shut down
15/03/25 12:18:47 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
15/03/25 12:18:48 INFO spark.SparkContext: Successfully stopped SparkContext

查看任务监控,http://192.168.1.2:8080/

Spark On Yarn实战

Spark On Yarn实战

查看worker信息 http://192.168.1.3:8081/

Spark On Yarn实战

spark on yarn实践

[hadoop@master ~]$ cd spark/examples/src/main/scala/org/apache/spark/examples/
[hadoop@master examples]$ spark-submit --master yarn-cluster \
> --class org.apache.spark.examples.SparkPi \
> --driver-memory 400m \
> --executor-memory 400m \
> --executor-cores 1 \
> --num-executors 2 \
> /home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar 2

# 如报下面错误,修改yarn-site.xml文件
	
		yarn.scheduler.maximum-allocation-mb
		800
	
# value大于800即可,然后重启yarn

# 正常结果如下:

[hadoop@master sbin]$ spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 400m --executor-memory 400m --executor-cores 1 --num-executors 2 /home/hadoop
/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar 2Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/03/25 13:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/25 13:06:09 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.1.2:8032
15/03/25 13:06:09 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 2
15/03/25 13:06:09 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
      queueApplicationCount = 0, queueChildQueueCount = 0
15/03/25 13:06:09 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 800
15/03/25 13:06:09 INFO yarn.Client: Preparing Local resources
15/03/25 13:06:10 INFO yarn.Client: Uploading file:/home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar to hdfs://master:9000/user/hadoop/.sparkStaging/application_1427313904247_0001/sp
ark-examples-1.0.2-hadoop2.2.0.jar15/03/25 13:06:13 INFO yarn.Client: Uploading file:/home/hadoop/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.2.0.jar to hdfs://master:9000/user/hadoop/.sparkStaging/application_142
7313904247_0001/spark-assembly-1.0.2-hadoop2.2.0.jar15/03/25 13:06:25 INFO yarn.Client: Setting up the launch environment
15/03/25 13:06:25 INFO yarn.Client: Setting up container launch context
15/03/25 13:06:25 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx400m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.app.name=\"org.apache.spar
k.examples.SparkPi\", -Dspark.eventLog.enabled=\"true\", -Dspark.eventLog.dir=\"hdfs://master:9000/spark/log\", -Dspark.yarn.historyServer.address=\"master:18080\",  -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , file:/home/hadoop/spark/lib/spark-examples-1.0.2-hadoop2.2.0.jar,  --args  '2' , --executor-memory, 400, --executor-cores, 1, --num-executors , 2, 1>, /stdout, 2>, /stderr)15/03/25 13:06:25 INFO yarn.Client: Submitting application to ASM
15/03/25 13:06:25 INFO impl.YarnClientImpl: Submitted application application_1427313904247_0001 to ResourceManager at master/192.168.1.2:8032
15/03/25 13:06:26 INFO yarn.Client: Application report from ASM: 
	 application identifier: application_1427313904247_0001
	 appId: 1
	 clientToAMToken: null
	 appDiagnostics: 
	 appMasterHost: N/A
	 appQueue: default
	 appMasterRpcPort: 0
	 appStartTime: 1427313985731
	 yarnAppState: ACCEPTED
	 distributedFinalState: UNDEFINED
	 appTrackingUrl: master:8088/proxy/application_1427313904247_0001/
	 appUser: hadoop
15/03/25 13:06:27 INFO yarn.Client: Application report from ASM: 
	 application identifier: application_1427313904247_0001
	 appId: 1
	 clientToAMToken: null
	 appDiagnostics: 
	 appMasterHost: N/A
	 appQueue: default
	 appMasterRpcPort: 0
	 appStartTime: 1427313985731
	 yarnAppState: ACCEPTED
	 distributedFinalState: UNDEFINED
	 appTrackingUrl: master:8088/proxy/application_1427313904247_0001/
	 appUser: hadoop	

查看yarn监控页面:http://192.168.1.2:8088/cluster

Spark On Yarn实战

Spark On Yarn实战

可以看到任务是在slave2上面执行的

访问http://192.168.1.4:8042/node

Spark On Yarn实战

登录slave2查看

[hadoop@slave2 ~]$ cd /home/hadoop/hadoop/logs/userlogs/application_1427313904247_0001/container_1427313904247_0001_01_000001
[hadoop@slave2 container_1427313904247_0001_01_000001]$ ls
stderr  stdout
[hadoop@slave2 container_1427313904247_0001_01_000001]$ cat stdout 
Pi is roughly 3.13774
[hadoop@slave2 ~]$ cd /home/hadoop/spark/examples/src/main/scala/org/apache/spark/examples/
[hadoop@slave2 examples]$ cat SparkPi.scala 
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.examples

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = 100000 * slices
    val count = spark.parallelize(1 to n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

在yarn上面使用spark-shell

[hadoop@master ~]$ spark-shell --master yarn-client

网页标题:SparkOnYarn实战
分享URL:http://csdahua.cn/article/pohehj.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流