我们的demo是简单的用sql的方式从csv文件中筛选数据。
package Demo
import org.apache.spark._
import org.apache.spark.sql._
import scala.collection.mutable
object SqlDemo {
def main(argment: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test").setMaster("local") //本地调试运行
val sc = new SparkContext(conf) // 建立spark操作上下文
val sqlContext = new SQLContext(sc) // 建立spark sql 操作上下文
val rowRDD = sc.textFile("xxxPath\\people.csv") //读取 csv 文件
.map(line => { // 转换成 row 结构
val data = line.split(";") // 切分字段
val list = mutable.ArrayBuffer[Any]()
list.append(data(0)) //填充数据
list.append(data(1).toInt) // 填充数据
Row.fromSeq(list) //创建row
})
val fields = new mutable.ArrayBuffer[StructField]()
fields.append(StructField("name",StringType,true)) //添加 name schema
fields.append(StructField("age",IntegerType,true)) //添加 age schema
val schema = StructType(fields) //创建schema 结构
val rdd = sqlContext.applySchema(rowRDD, schema) //将rdd与schema做匹配
rdd.registerTempTable("people") //注册临时表
sqlContext.sql("select * from people where age >= 18") // 筛选成年人士
.collect()
.foreach(println)
sc.stop()
}
}