TaskScheduler决定spark运行模式及使用


TaskScheduler决定spark使用

spark运行方式:

  • Local[N]
  • Local cluster[worker,core.memory]
  • sprk://localhost:prot
  • mesos://host:port
  • yarn standalone/cluster
  • yarn client

资源调度:

  • DAGScheduler(依赖,多个调度,shuffle来决定,数据本地性)TaskSet->TaskScheduler->starttask,monitor
  • DAGScheduler所有模式一样,taskScheduler调度任务给计算资源不一样

taskScheduler

成对的TaskSchduler/SchedulerBackEnd与DAGschduler交互用

实现部分代码:

    private[spark] trait TaskScheduler {
            //  提交待运行任务及
            //去掉所有任务
            //DAGScheduler设置
            //并行度
            }
    //继承
    private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
    extends TaskScheduler with Logging
    def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
    def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)

    private[spark] trait ExecutorBackend {
    def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
    }   





    //收到服务
    SchedulerBackEnd.reciveOffers()

    //运行服务
    Excutor->TaskRunner.run




    //运行池
    val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch
    worker")
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    def launchTask(context: ExecutorBackend, taskId: Long, serializedTask:
    ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
    }

模式:

local:

使用

- ./bin/run-example org.apache.spark.examples.SparkPi local
- ./bin/spark-submit \--class my.main.ClassName --master local[8] \ my-app.jar

实现:

  • TaskSchedulerImpl->LocalBackend.receiveOffers()->Excutor->TaskRunner.run

  • LocalActor AkkaActor??

standalone:

使用

- ./sbin/start-master.sh
- ./bin/spark-class org.apache.spark.deploy.worker.Worker spark:// MasterURL:PORT
- conf/slave文件|spark-env.sh|sbin/*.sh
- ./bin/run-example org.apache.spark.examples.SparkPi spark://10.0.2.31:7077
-new: ./bin/spark-submit  --class my.main.ClassName --master spark://mycluster:7077 --executor-memory 20G  --total-executor-cores 100  my-app.jar

实现

  • TaskSchedulerImpl-配合>SparkDeploySchedulerBackend extends with CoarseGrainedSchedulerBackend ,with Akka Actor 控制excutor

序列:

重要start SparkDeploySchedulerBackend->

SparkDeploySchedulerBackend->client->master->worker->CoarseGrainedExecutorBackend-#>register SparkDeploySchedulerBackend->Driver Actor

SparkDeploySchedulerBackend->客户端actor

local cluster: 伪分布

使用

./bin/run-example org.apache.spark.examples.SparkPi local-cluster[2,2,1024]

实现就是local standalone

mesos 略

yarn standalone0.9->yarn cluster1.0

使用

  • package:SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly
  • 你的程序打包
  • spark.yarn.submit.file.replication=10
  • spark.yarn.preseve.stagin.files=ture
  • spark.yarn.scheduler.heartbeat.interval-ms:5
  • spark.yarn.max.worker.failures=?
  • 使用:

      $ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar \
      ./bin/spark-class org.apache.spark.deploy.yarn.Client \
      --jar examples/target/scala-2.10/spark-examples-assembly-0.9.1.jar \
      --class org.apache.spark.examples.SparkPi \
      --args yarn-standalone \
      --num-workers 3 \
      --master-memory 4g \
      --worker-memory 2g \
      --worker-cores 1
    
      new:
    
      HADOOP_CONF_DIR=XX /bin/spark-submit \
      --class my.main.ClassName \
      --master yarn-cluster \
      --executor-memory 20G \
      --num-executors 50 \
      my-app.jar
    

实现:

  • 提交app
  • yarnrpc
  • amipfiter
  • driver线程启动application->init sparkcontext
  • start yarnallcator
  • yarnclusterschduler.postStartHook()->notify am inited sparkcontext ???
  • yarnrpc.registertoAm
  • allocator.numExcutorsContainer
  • ExecutorRunnable->CoarseGrainedExecutorBackend->jobliststart
  • Monitor:akka->CoarseGrainedScheduler
  • YARNClusterScheduler ,我真没见过这东西!,只是一个简单的TaskSchedulerImpl的实现加入等待excutorlogic,名字很挺狠。。。
  • TaskSchedulerImpl->YARNClusterScheduler->CoarseGrainedSchedulerBackend

yarn client

使用:不用client,直接配path!! ’类am sparkcontext‘跑本地,Spark Shell 需要交互,调试类型,常驻本地

 SPARK_JAR
 SPARK_WORKER_INSTANCES 2
 SPARK_WORKER_CORES 1
 SPARK_WORKER_MEMORY 1G
 SPARK_MASTER_MEMORY 512 MB
 SPARK_YARN_APP_NAME 
 SPARK_YARN_QUEUE 'default'
 SPARK_YARN_DIST_FILES
 SPARK_YARN_DIST_ARCHIVES

SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2
.0.jar \
SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples-assembly
-0.9.1.jar \
./bin/run-example org.apache.spark.examples.SparkPi yarn-client

实现:

  • am两件事:1.containe资源申请 2.维护driver调度
  • 它把am的调度放在本地了,与executorbackend通信

序列:

- initSparkContext
  MapOutputTracker->MapOutputTracker->taskScheduler(选择scheduler(YarnClientClusterScheduler),backend(YarnClientSchedulerBackend))->dagScheduler
- .runApp->ExecutorLauncher  
   amClient->register->driver->MonitorActor--CoarseGrainedSchedulerBackend

maredue:vs

  • task 进程级 线程级

使用scala提交任务

  • scala写个
  • ***.sbt

name := "Spark application in Scala" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

  • sbt/sbt package
  • bin/spark-submit --class "scala.Test" --master local[4] target/scala-2.10/simple-project_2.10-1.0.jar
Tables Are Cool
col 3 is right-aligned $1600
col 2 is centered $12
zebra stripes are neat $1
大山 /
Published under (CC) BY-NC-SA in categories 逻辑与现象  spark  tagged with spark  yarn