033 Java Spark的编程

1.Java SparkCore编程

  入口是:JavaSparkContext
    基本的RDD是:JavaRDD
    其他常用RDD: JavaPairRDD
  JavaRDD和JavaPairRDD转换:
    JavaRDD => JavaPairRDD: 通过mapToPair函数
    JavaPairRDD => JavaRDD: 通过map函数转换

2.前提

  运行前将core-site.xml复制到resource文件夹中

3.程序  

  1 package com.ibeifeng.senior;
  2 
  3 import org.apache.spark.SparkConf;
  4 import org.apache.spark.api.java.JavaPairRDD;
  5 import org.apache.spark.api.java.JavaRDD;
  6 import org.apache.spark.api.java.JavaSparkContext;
  7 import org.apache.spark.api.java.function.FlatMapFunction;
  8 import org.apache.spark.api.java.function.Function2;
  9 import org.apache.spark.api.java.function.PairFunction;
 10 import org.apache.spark.api.java.function.VoidFunction;
 11 import scala.Tuple2;
 12 
 13 import java.sql.Connection;
 14 import java.sql.DriverManager;
 15 import java.sql.PreparedStatement;
 16 import java.util.Arrays;
 17 import java.util.Iterator;
 18 
 19 /**
 20  * Java实现Spark的WordCount程序
 21  * Created by ibf on 02/15.
 22  */
 23 public class JavaWordCountSparkCore {
 24     public static void main(String[] args) {
 25         String resultHDFSSavePath = "/beifeng/spark/result/wordcount/" + System.currentTimeMillis();
 26         // 1. 创建SparkConf配置信息
 27         SparkConf conf = new SparkConf()
 28                 .setMaster("local[*]")
 29                 .setAppName("spark-wordcount");
 30 
 31         // 2. 创建SparkContext对象,在java编程中,该对象叫做JavaSparkContext
 32         JavaSparkContext sc = new JavaSparkContext(conf);
 33 
 34         // 3. 从hdfs读取文件形成RDD
 35         // TODO: 文件路径自行给定
 36         JavaRDD<String> rdd = sc.textFile("/hive/common.db/dept");
 37 
 38         // 4. RDD数据处理
 39         // TODO: 过滤特殊字符
 40         // 4.1 行数据的分割,调用flatMap函数
 41         JavaRDD<String> wordRDD = rdd.flatMap(new FlatMapFunction<String, String>() {
 42             @Override
 43             public Iterable<String> call(String s) throws Exception {
 44                 String line = s;
 45                 if (line == null) line = "";
 46                 String[] arr = line.split("	");
 47                 return Arrays.asList(arr);
 48             }
 49         });
 50 
 51         // 4.2 将数据转换为key/value键值对
 52         /**
 53          * RDD的reduceByKey函数不是RDD类中,通过隐式转换后,存在于其他类中<br/>
 54          * Java由于不存在隐式转换,所以不能直接调用map函数进行key/value键值对转换操作,必须调用特定的函数
 55          * */
 56         JavaPairRDD<String, Integer> wordCountRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
 57             @Override
 58             public Tuple2<String, Integer> call(String s) throws Exception {
 59                 return new Tuple2<String, Integer>(s, 1);
 60             }
 61         });
 62 
 63         // 4.3 聚合结果
 64         JavaPairRDD<String, Integer> resultRDD = wordCountRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
 65 
 66             @Override
 67             public Integer call(Integer v1, Integer v2) throws Exception {
 68                 return v1 + v2;
 69             }
 70         });
 71 
 72         // 5. 结果输出
 73         // 5.1 结果输出到HDFS
 74         resultRDD.saveAsTextFile(resultHDFSSavePath);
 75         // 5.2 结果输出到MySQL
 76         /**
 77          * SparkCore RDD数据的读入是通过InputFormat来读入数据形成RDD的
 78          *  sc.newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
 79          conf: Configuration,
 80          fClass: Class[F],
 81          kClass: Class[K],
 82          vClass: Class[V])
 83          * RDD的saveASxxxx相关方法是利用OutputFormat来进行数据输出的
 84          * resultRDD.saveAsNewAPIHadoopDataset(conf: Configuration);
 85          */
 86         resultRDD.foreachPartition(new VoidFunction<java.util.Iterator<Tuple2<String, Integer>>>() {
 87 
 88             @Override
 89             public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
 90                 Class.forName("com.mysql.jdbc.Driver");
 91                 String url = "jdbc:mysql://hadoop-senior01:3306/test";
 92                 String username = "root";
 93                 String password = "123456";
 94                 Connection conn = null;
 95                 try {
 96                     // 1. 创建connection连接
 97                     conn = DriverManager.getConnection(url, username, password);
 98 
 99                     // 2. 构建statement
100                     String sql = "insert into wordcount values(?,?)";
101                     PreparedStatement pstmt = conn.prepareStatement(sql);
102 
103                     // 3. 结果数据输出
104                     while (tuple2Iterator.hasNext()) {
105                         Tuple2<String, Integer> t2 = tuple2Iterator.next();
106                         pstmt.setString(1, t2._1());
107                         pstmt.setLong(2, t2._2());
108 
109                         pstmt.executeUpdate();
110                     }
111                 } finally {
112                     // 4. 关闭连接
113                     conn.close();
114                 }
115 
116             }
117         });
118 
119 
120     }
121 }
原文地址:https://www.cnblogs.com/juncaoit/p/6543518.html