Windows下用intellij搭建spark运行环境

rick    2018-01-27 01:28

环境版本说明:

windows10,java-1.8.0,scala-2.11.12,spark-2.1.2-hadoop2.6下载都在官网

1. java安装,不再多说

2. 下载scala和spark(此处一定要想好spark版本问题,其依赖的scala环境不一样,鄙人用的scala-2.11.12,spark-2.1.2-hadoop2.6)下载地址在官网(请自行百度,搜搜更健康)

3. 安装scala

http://www.scala-lang.org/download/2.11.12.html

scala下载地址还是贴出来哈哈~注意下载msi installer版本,直接安装,一般默认添加环境变量

验证:开始栏输入cmd

在cmd里输入scala-version

显示

Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

4. 打开intellij

plugin里安装scala语言支持,安装后new—>project—>scala—>IDEA

添加Scala和java支持

5. 测试

在src里new一个scala class,输入下列代码,run

object HelloWorld {
  def main(args: Array[String]): Unit = {
    println("Hello, world!")
  }
}

控制台输出Hello, world!就正确了

6. 添加spark支持

这里初次是用spark1.6.3,其中集成了一个spark-assembly-hadoop的jar包,用它是很简单的,下帖有详细说明

https://www.jianshu.com/p/200473f264bc

但是到了spark2.x 以后,那个jar包被摘掉了,如果安装需要maven的支持(很麻烦,pom文件中的dependency下的spark版本对scala的版本要求的很仔细,鄙人不想再下一个scala了或者换个spark),所以想不用maven进行操作,这里在网上找了很多,终于发现简书的是最简单可行的,原帖如下

https://www.jianshu.com/p/a5258f2821fc

只需新建java工程,并在project structure中添加依赖库java、scala、spark源码库,同时需要构建一个目录树如图所示

接下来使用graphX,具体内容自行百度,找了网上的一段demo

package test.SparkDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


object scala {
  def main(args: Array[String]) {
    //屏蔽日志
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)


    //设置运行环境
    val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")
    val sc = new SparkContext(conf)


    //设置顶点和边,注意顶点和边都是用元组定义的Array
    //顶点的数据类型是VD:(String,Int)
    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )

    //边的数据类型ED:Int
    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )


    //构造vertexRDD和edgeRDD
    val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
    val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)


    //构造图Graph[VD,ED]
    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)


    //***********************************************************************************
    //***************************  图的属性    ****************************************
    //**********************************************************************************
    println("属性演示")
    println("**********************************************************")
    println("找出图中年龄大于30的顶点:")

    graph.vertices.filter { case (id, (name, age)) => age > 30}.collect.foreach {
      case (id, (name, age)) => println(s"$name is $age")
    }


    //边操作:找出图中属性大于5的边
    println("找出图中属性大于5的边:")
    graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
    println


    //triplets操作,((srcId, srcAttr), (dstId, dstAttr), attr)
    println("列出边属性>5的tripltes:")
    for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {
      println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
    }
    println

    //Degrees操作
    println("找出图中最大的出度、入度、度数:")
    def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
      if (a._2 > b._2) a else b
    }
    println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))
    println



    //***********************************************************************************
    //***************************  转换操作    ****************************************
    //**********************************************************************************
    println("**********************************************************")
    println("转换操作")
    println("**********************************************************")

    println("顶点的转换操作,顶点age + 10:")
    graph.mapVertices{ case (id, (name, age)) => (id, (name, age+10))}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
    println
    println("边的转换操作,边的属性*2:")
    graph.mapEdges(e=>e.attr*2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
    println


    //***********************************************************************************
    //***************************  结构操作    ****************************************
    //**********************************************************************************
    println("**********************************************************")
    println("结构操作")
    println("**********************************************************")
    println("顶点年纪>30的子图:")

    val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)
    println("子图所有顶点:")
    subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
    println

    println("子图所有边:")
    subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
    println


    //***********************************************************************************
    //***************************  连接操作    ****************************************
    //**********************************************************************************

    println("**********************************************************")
    println("连接操作")
    println("**********************************************************")

    val inDegrees: VertexRDD[Int] = graph.inDegrees
    case class User(name: String, age: Int, inDeg: Int, outDeg: Int)

    //创建一个新图,顶点VD的数据类型为User,并从graph做类型转换
    val initialUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0)}

    //initialUserGraph与inDegrees、outDegrees(RDD)进行连接,并修改initialUserGraph中inDeg值、outDeg值
    val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
      case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
    }.outerJoinVertices(initialUserGraph.outDegrees) {
      case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg,outDegOpt.getOrElse(0))
    }


    println("连接图的属性:")
    userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))
    println


    println("出度和入读相同的人员:")
    userGraph.vertices.filter {
      case (id, u) => u.inDeg == u.outDeg
    }.collect.foreach {
      case (id, property) => println(property.name)
    }
    println


    //***********************************************************************************
    //***************************  实用操作    ****************************************
    //**********************************************************************************
    println("**********************************************************")
    println("聚合操作")
    println("**********************************************************")
    println("找出5到各顶点的最短:")

    val sourceId: VertexId = 5L // 定义源点
    val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      (id, dist, newDist) => math.min(dist, newDist),
      triplet => {  // 计算权重
        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        } else {
          Iterator.empty
        }
      },

      (a,b) => math.min(a,b) // 最短距离
    )

    println(sssp.vertices.collect.mkString("\n"))
    sc.stop()
  }
}

运行,结果如下

这里虽然报错了但是任然可以运行,百度出解决步骤

https://github.com/srccodes/hadoop-common-2.2.0-bin

到这里下载解压并添加bin文件到环境变量,具体trouble shooting在这里

http://blog.csdn.net/shawnhu007/article/details/51518879

再次运行,ok!

大致就这些,scala是函数式编程范式,了解不多,这里代码也是一知半解copy下来的,但是现在有了一个window上的spark编写调试工具,感觉很方便

贴图不够,但是大致流程如下,如有疑问欢迎提出,一起讨论!

Views: 1.9K

[[total]] comments

Post your comment
  1. [[item.time]]
    [[item.user.username]] [[item.floor]]Floor
  2. Click to load more...
  3. Post your comment