博客 / 詳情

返回

Spark RDD Java

1、Tranform(轉換算子)

map

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.List;

/**
 * 將處理的數據逐條進行映射轉換,這裏的轉換可以是類型的轉換,也可以是指的轉換
 */
public class MapRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MapRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums);

        JavaRDD<Integer> mapRDD = numsRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer value) throws Exception {
                return value * 2;
            }
        });

        mapRDD.collect().forEach(System.out::println);


        JavaRDD<String> fileRDD = sc.textFile("datas/apache.log");

        JavaRDD<String> urlRDD = fileRDD.map(new Function<String, String>() {
            @Override
            public String call(String line) throws Exception {
                return line.split(" ")[6];
            }
        });

        urlRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

mapPartitions

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * 將處理的數據以分區為單位發送給計算節點進行處理,這裏的處理是指可以進行任意的處理,哪怕是過濾數據
 *
 * map和mapPartitions的區別?
 * 數據處理角度
 * Map算子是分區內一個數據一個數據的執行,類似於串行操作。而mapPartitions算子是以分區為單位進行批處理操作
 *
 * 功能的角度
 * Map算子主要目的將數據源中的數據進行轉換和改變。但是不會減少或增多數據。MapPartitions算子需要傳遞一個迭代器,返回一個迭代器,沒有要求的元素的個數
 * 保持不變,所以可以增加或減少數據
 *
 * 性能角度
 * Map算子因為類似於串行操作,所以性能比較低,而mapPartitions算子類似於批處理,所以性能較高。但是mapPartitions算子會長時間佔用內存,那麼這樣會導致
 * 內存可能不夠用,出現內存溢出的錯誤。所以在內存有限的情況下,不推薦使用。使用map操作
 */
public class MapPartitionsRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MapPartitionsRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        JavaRDD<Integer> mapPartitionsRDD = numsRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
            @Override
            public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception {
                // 注意,這裏只會打印兩遍,為什麼呢?是因為有兩個分區,每個分區處理一次
                System.out.println("xxxxxxxxxxx");
                List<Integer> result = new ArrayList<>();
                while (iterator.hasNext()) {
                    Integer num = iterator.next();
                    result.add(num * 2);
                }
                return result.iterator();
            }
        });

        mapPartitionsRDD.collect().forEach(System.out::println);

        // 計算每個分區的最大值
        JavaRDD<Integer> maxPartitionValueRDD = mapPartitionsRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
            @Override
            public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception {

                List<Integer> result = new ArrayList<>();
                Integer maxValue = Integer.MIN_VALUE;
                while (iterator.hasNext()) {
                    Integer value = iterator.next();
                    if (value > maxValue) {
                        maxValue = value;
                    }
                }
                result.add(maxValue);
                return result.iterator();
            }
        });

        maxPartitionValueRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

mapPartitionsWithIndex

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/**
 * 將處理的數據以分區為單位發送到計算節點進行處理,這裏處理的是指可以進行任意的處理,哪怕是過濾數據,在處理時同時可以獲取當前分區的索引
 */
public class MapPartitionsWithIndexRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MapPartitionsWithIndexRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        Function2 mpIndexFunction = new Function2<Integer, Iterator<Integer>, Iterator<Integer>>(){
            @Override
            public Iterator<Integer> call(Integer index, Iterator<Integer> iterator) throws Exception {
                if(index == 0){
                    return iterator;
                }
                // 返回一個空的迭代器
                return Collections.emptyIterator();
            }
        };

        // mapPartitionsWithIndex 的時候需要注意,preservesPartitioning是否保留 partitioner
        // 函數外部聲明
        JavaRDD mpRDD = numsRDD.mapPartitionsWithIndex(mpIndexFunction, true);

        mpRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

flatMap

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

/**
 * 將處理的數據進行扁平化後再進行映射處理,所以算子也稱之為扁平映射,説白了其實就是可以一對多的輸出
 */
public class FlatMapRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("FlatMapRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> fileRDD = sc.textFile("datas/wc");

        JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.stream(line.split(" ")).iterator();
            }
        });

        wordRDD.collect().forEach(System.out::println);

        List<ArrayList<Integer>> nums = new ArrayList<>();

        ArrayList<Integer> nums1 = new ArrayList<>();
        nums1.add(1);
        nums1.add(2);
        nums.add(nums1);

        ArrayList<Integer> nums2 = new ArrayList<>();
        nums2.add(3);
        nums2.add(4);
        nums.add(nums2);

        JavaRDD<ArrayList<Integer>> numsRDD = sc.parallelize(nums);

        JavaRDD<Integer> numsFlatMapRDD = numsRDD.flatMap(new FlatMapFunction<ArrayList<Integer>, Integer>() {
            @Override
            public Iterator<Integer> call(ArrayList<Integer> integers) throws Exception {
                return integers.iterator();
            }
        });

        numsFlatMapRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

mapValues

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 只對value進行操作
 */
public class MapValuesRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MapValuesRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);


        List<Tuple2<String, Integer>> userInfos = new ArrayList<>();
        userInfos.add(Tuple2.apply("Alice", 300));
        userInfos.add(Tuple2.apply("zhangsan", 200));
        userInfos.add(Tuple2.apply("lisi", 309));
        userInfos.add(Tuple2.apply("wagnwu", 201));
        userInfos.add(Tuple2.apply("mayun", 234));
        userInfos.add(Tuple2.apply("haha", 223));

        JavaPairRDD<String, Integer> userInfosRDD = sc.parallelizePairs(userInfos, 2);

        // 都漲薪100
        JavaPairRDD<String, Integer>  userInfosSalaryAdd100 = userInfosRDD.mapValues(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 + 100;
            }
        });

        userInfosSalaryAdd100.collect().forEach(System.out::println);

        sc.stop();
    }
}

glom

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * 將同一個分區的數據直接轉換為相同類型的內存數組進行處理,分區不變
 */
public class GlomRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("GlomRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        JavaRDD<List<Integer>> glomRDD = numsRDD.glom();
        JavaRDD<Integer> mapRDD = glomRDD.map(new Function<List<Integer>, Integer>() {
            @Override
            public Integer call(List<Integer> nums) throws Exception {
                return Collections.max(nums);
            }
        });

        List<Integer> resultList = mapRDD.collect();
        Integer result = resultList.stream().reduce(Integer::sum).orElse(0);
        System.out.println(result);

        sc.stop();
    }
}

groupBy

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * reduceByKey和groupByKey的區別?
 * 從shuffle角度 : reduceByKey和groupByKey都存在shuffle操作,但是reduceByKey可以在shuffle前對分區內相同的key進行預聚合(combine)功能,
 * 這樣會減少落盤的數據量,而groupByKey只是進行分組,不存在數據量減少的問題,reduceByKey性能比較高
 *
 * 從功能角度: reduceByKey其實包含分區和聚合的功能。GroupByKey只能分組,不能聚合,所以分組聚合場景下,推薦使用reduceByKey,如果僅僅是分組而
 * 不需要聚合。那麼還是隻能使用reduceByKey
 */
public class GroupByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("GroupByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> words = new ArrayList<>();
        words.add("Hello");
        words.add("Spark");
        words.add("Spark");
        words.add("World");

        JavaRDD<String> wordsRDD = sc.parallelize(words);
        JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return Tuple2.apply(word, 1);
            }
        });

        JavaPairRDD<String, Iterable<Integer>> wordGroupByRDD = wordToPairRDD.groupByKey();

        JavaPairRDD<String, Integer> wordCountRDD = wordGroupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
            @Override
            public Integer call(Iterable<Integer> iterable) throws Exception {
                return ((Collection<?>) iterable).size();
            }
        });

        wordCountRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

filter

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;

/**
 * 將數據根據指定的規則進行篩選過濾,符合規則的數據保留,不符合規則的數據丟棄。當數據進行篩選過濾過,分區不變,但是分區內的數據可能不均衡
 * 生成環境下,可能會出現數據傾斜,所以一般filter之後可以repartition
 */
public class FilterRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("FilterRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);


        JavaRDD<String> logFileRDD = sc.textFile("datas/apache.log");

        JavaRDD<String> filterRDD = logFileRDD.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String value) throws Exception {
                return value.contains("7/05/2015");
            }
        });

        JavaRDD<String> mapRDD = filterRDD.map(new Function<String, String>() {
            @Override
            public String call(String value) throws Exception {
                String[] fields = value.split(" ");
                return fields[6];
            }
        });

        mapRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

sample

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.List;

/**
 * 其實主要查看一下數據的分佈
 */
public class SampleRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SampleRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums);
        /**
         * 第一個參數 : 抽取的數據是否放回,false : 不放回,true : 放回
         * 第二個參數 : 抽取的機率,範圍在[0,1]之間,抽取出現的概率,大於1,重複機率
         * 第三個參數 : 隨機種子
         */
        JavaRDD<Integer> sampleRDD1 = numsRDD.sample(false, 0.5);
        JavaRDD<Integer> sampleRDD2 = numsRDD.sample(true, 3);
        sampleRDD1.collect().forEach(System.out::println);
        System.out.println("**************************");
        sampleRDD2.collect().forEach(System.out::println);

        sc.stop();
    }
}

distinct

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * 將數據集中重複的數據去重
 */
public class DistinctRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("DistinctRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(3);
        nums.add(1);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        JavaRDD<Integer> distinctRDD = numsRDD.distinct(2);
        distinctRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

coalesce

package com.journey.core.rdd.transform;

import com.clearspring.analytics.util.Lists;
import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * 根據數據量縮減分區,用於大數據集過濾後,提高小數據集的執行效率
 * 當Spark程序中,存在過多的小任務的時候,可以通過coalesce方法,縮減合併分區,減少分區的個數,減少任務調度成本
 */
public class CoalesceRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("CoalesceRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);
        nums.add(5);
        nums.add(6);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 6);

        /**
         * coalesce其實需要注意一點,就是默認shuffle為false,也就是在縮減分區的時候,是進行分區的合併的
         * coalesce 在不shuffle的情況下,不能增加分區
         */
        JavaRDD<Integer> coalesceRDD = numsRDD.coalesce(2);

        coalesceRDD.saveAsTextFile("datas/output");
        sc.stop();
    }
}

repartition

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * 該操作內部其實執行的是coalesce操作,參數shuffle的默認值為true。無論是將分區數多的RDD轉換為分區少的RDD,還是將分區少的RDD
 * 轉換為分區多的RDD,repartition都可以完成,因為無論如何都會經過shuffle過程
 */
public class RepartitionRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("RepartitionRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);
        nums.add(5);
        nums.add(6);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 6);

        JavaRDD<Integer> coalesceRDD = numsRDD.repartition(10);

        coalesceRDD.saveAsTextFile("datas/output");
        sc.stop();
    }
}

intersection & union & subtract & zip

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.ArrayList;
import java.util.List;

/**
 * 該操作用於排序數據。在排序之前,可以將數據通過f函數進行處理,之後按照f函數處理的結果進行排序,默認是升序排序。排序後新產生的RDD的分區數
 * 與原RDD分區數一直。中間存在shuffle的過程
 */
public class IntersectionRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("SortByRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Integer> nums1 = new ArrayList<>();
        nums1.add(1);
        nums1.add(2);
        nums1.add(3);
        nums1.add(4);

        List<Integer> nums2 = new ArrayList<>();
        nums2.add(3);
        nums2.add(4);
        nums2.add(5);
        nums2.add(6);

        List<String> nums3 = new ArrayList<>();
        nums3.add("3");


        JavaRDD<Integer> nums1RDD = sc.parallelize(nums1,1);
        JavaRDD<Integer> nums2RDD = sc.parallelize(nums2,1);

        // 必須相同類型
        JavaRDD<Integer> intersectionRDD = nums1RDD.intersection(nums2RDD);
        JavaRDD<Integer> unionRDD = nums1RDD.union(nums2RDD);
        // 必須相同類型
        JavaRDD<Integer> subtractRDD = nums1RDD.subtract(nums2RDD);
        // 必須相同類型,相同分區個數
        JavaPairRDD<Integer, Integer> zipRDD = nums1RDD.zip(nums2RDD);

        intersectionRDD.collect().forEach(System.out::println);
        System.out.println("******************************");
        unionRDD.collect().forEach(System.out::println);
        System.out.println("******************************");
        subtractRDD.collect().forEach(System.out::println);
        System.out.println("******************************");
        zipRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

partitionBy

package com.journey.core.rdd.transform;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 將數據按照指定Partitioner重新進行分區。Spark默認的分區器是HashPartitioner
 */
public class PartitionerByRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("PartitionerByRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, String>> infos = new ArrayList<>();
        infos.add(Tuple2.apply("1305261989234", "zhangsan"));
        infos.add(Tuple2.apply("1505261989234", "lisi"));
        infos.add(Tuple2.apply("1305261982343", "wagnwu"));
        infos.add(Tuple2.apply("1505261382343", "zhaoliu"));

        // 將130開頭的放入一個分區,將150開頭放入一個分區中
        // TODO 注意,如果是pairs,需要調用的是parallelizePairs
        JavaPairRDD<String, String> infosRDD = sc.parallelizePairs(infos, 2);

        JavaPairRDD<String, String> partitionByRDD = infosRDD.partitionBy(new Partitioner() {
            @Override
            public int numPartitions() {
                return 2;
            }

            @Override
            public int getPartition(Object key) {
                String item = key.toString();
                if (item.startsWith("130")) {
                    return 0;
                } else if (item.startsWith("150")) {
                    return 1;
                }
                return 0;
            }
        });

        partitionByRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

reduceByKey

package com.journey.core.rdd.transform;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 可以將相同的key對應的value進行聚合
 */
public class ReduceByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("ReduceByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> words = new ArrayList<>();
        words.add("Hello");
        words.add("Spark");
        words.add("Spark");
        words.add("World");

        JavaRDD<String> wordsRDD = sc.parallelize(words);
        JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return Tuple2.apply(word, 1);
            }
        });

        JavaPairRDD<String, Integer> wordCountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordCountRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

groupByKey

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * reduceByKey和groupByKey的區別?
 * 從shuffle角度 : reduceByKey和groupByKey都存在shuffle操作,但是reduceByKey可以在shuffle前對分區內相同的key進行預聚合(combine)功能,
 * 這樣會減少落盤的數據量,而groupByKey只是進行分組,不存在數據量減少的問題,reduceByKey性能比較高
 *
 * 從功能角度: reduceByKey其實包含分區和聚合的功能。GroupByKey只能分組,不能聚合,所以分組聚合場景下,推薦使用reduceByKey,如果僅僅是分組而
 * 不需要聚合。那麼還是隻能使用reduceByKey
 */
public class GroupByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("GroupByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> words = new ArrayList<>();
        words.add("Hello");
        words.add("Spark");
        words.add("Spark");
        words.add("World");

        JavaRDD<String> wordsRDD = sc.parallelize(words);
        JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return Tuple2.apply(word, 1);
            }
        });

        JavaPairRDD<String, Iterable<Integer>> wordGroupByRDD = wordToPairRDD.groupByKey();

        JavaPairRDD<String, Integer> wordCountRDD = wordGroupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
            @Override
            public Integer call(Iterable<Integer> iterable) throws Exception {
                return ((Collection<?>) iterable).size();
            }
        });

        wordCountRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

aggregateByKey

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
 * 第一個參數表示初始值
 * 第二個參數分區內的計算規則
 * 第三個參數分區間的計算規則
 */
public class AggregateByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("AggregateByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, Integer>> words = new ArrayList<>();
        words.add(Tuple2.apply("Hello", 3));
        words.add(Tuple2.apply("Spark", 2));
        words.add(Tuple2.apply("Hello", 10));
        words.add(Tuple2.apply("Spark", 17));

        JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);



        // aggregateByKey 的初始值只會參與分區內的計算
        JavaPairRDD<String, Integer> aggregateByKeyRDD = wordsRDD.aggregateByKey(10,
                new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        aggregateByKeyRDD.collect().forEach(System.out::println);

        // aggregateByKey 的初始值只會參與分區內的計算
        JavaPairRDD<String, Integer> aggregateByKeyRDD2 = wordsRDD.aggregateByKey(10,
                new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        // 分區內計算最大值
                        return Math.max(v1, v2);
                    }
                }, new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                });

        aggregateByKeyRDD2.collect().forEach(System.out::println);

        sc.stop();
    }
}

foldByKey

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 第一個參數表示初始值
 * 第二個參數表示分區內和分區間的計算規則,相同
 */
public class FoldByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("FoldByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, Integer>> words = new ArrayList<>();
        words.add(Tuple2.apply("Hello", 3));
        words.add(Tuple2.apply("Spark", 2));
        words.add(Tuple2.apply("Hello", 10));
        words.add(Tuple2.apply("Spark", 17));

        JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);

        JavaPairRDD<String, Integer> foldByKeyRDD = wordsRDD.foldByKey(10,
                new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        foldByKeyRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

combineByKey

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 求平均數
 * 第一個參數只做數據的轉換
 * 第二個參數分區內的計算
 * 第三個參數分區間的計算
 *
 * reduceByKey : 相同key的第一個數據不進程任何計算,分區內和分區間計算規則相同
 * foldByKey : 相同key的第一個數據和初始值進行分區內計算,分區內和分區間計算規則相同
 * aggregateByKey : 相同key的第一個數據和初始值進行分區內計算,分區內和分區間計算規則可以不相同
 * combineByKey : 當計算時,發現數據結構不滿足時,可以讓第一個數據轉換結構。分區內和分區間計算規則可以不相同
 */
public class CombineByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("CombineByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, Integer>> words = new ArrayList<>();
        words.add(Tuple2.apply("Hello", 3));
        words.add(Tuple2.apply("Spark", 2));
        words.add(Tuple2.apply("Hello", 3));
        words.add(Tuple2.apply("Spark", 2));
        words.add(Tuple2.apply("Spark", 2));
        words.add(Tuple2.apply("Spark", 2));

        JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);

        JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = wordsRDD.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
                return Tuple2.apply(v1, 1);
            }
        }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception {
                return Tuple2.apply(v1._1 + v2, v1._2 + 1);
            }
        }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
                return Tuple2.apply(v1._1 + v2._1, v1._2 + v2._2);
            }
        });

        combineByKeyRDD.collect().forEach(t -> {
            String key = t._1;
            Tuple2<Integer, Integer> tuple = t._2;
            System.out.println(key + ":" + tuple._1 / tuple._2);
        });

        JavaPairRDD<String, Integer> wordCountRDD = wordsRDD.combineByKey(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1;
            }
        }, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordCountRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

sortByKey

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 *  對key進行排序
 */
public class SortByKeyRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("CombineByKeyRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, Integer>> words = new ArrayList<>();
        words.add(Tuple2.apply("Alice", 3));
        words.add(Tuple2.apply("zhangsan", 2));
        words.add(Tuple2.apply("lisi", 3));
        words.add(Tuple2.apply("wagnwu", 2));
        words.add(Tuple2.apply("mayun", 2));
        words.add(Tuple2.apply("haha", 2));

        JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);

        // 默認是升序,可以指定降序排序,也可以指定自定義排序規則
        JavaPairRDD<String, Integer> sortWordsRDD = wordsRDD.sortByKey(true);

        sortWordsRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

join & leftOuterJoin

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素連接在一起的(K,(V,W))的RDD
 */
public class JoinRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JoinRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<Integer, String>> userInfos = new ArrayList<>();
        userInfos.add(Tuple2.apply(1, "zhagnsan"));
        userInfos.add(Tuple2.apply(2, "lisi"));
        userInfos.add(Tuple2.apply(3, "lisi"));


        List<Tuple2<Integer, String>> orders = new ArrayList<>();
        orders.add(Tuple2.apply(1, "iphone pad"));
        orders.add(Tuple2.apply(1, "mac pad"));
        orders.add(Tuple2.apply(2, "java book"));

        JavaPairRDD<Integer, String> userInfosRDD = sc.parallelizePairs(userInfos, 2);
        JavaPairRDD<Integer, String> ordersRDD = sc.parallelizePairs(orders, 2);

        JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = userInfosRDD.join(ordersRDD);

        joinRDD.collect().forEach(System.out::println);

        // 左連接,就是左邊都顯示,右邊沒有為empty
        JavaPairRDD<Integer, Tuple2<String, Optional<String>>> leftOuterJoinRDD = userInfosRDD.leftOuterJoin(ordersRDD);
        leftOuterJoinRDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

cogroup

package com.journey.core.rdd.transform;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * 相同的key會聚合在一起,value是一個集合
 */
public class CogroupRDD {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("CogroupRDD")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<Integer, String>> userInfos = new ArrayList<>();
        userInfos.add(Tuple2.apply(1, "zhagnsan"));
        userInfos.add(Tuple2.apply(2, "lisi"));
        userInfos.add(Tuple2.apply(3, "lisi"));


        List<Tuple2<Integer, String>> orders = new ArrayList<>();
        orders.add(Tuple2.apply(1, "iphone pad"));
        orders.add(Tuple2.apply(1, "mac pad"));
        orders.add(Tuple2.apply(2, "java book"));

        JavaPairRDD<Integer, String> userInfosRDD = sc.parallelizePairs(userInfos, 2);
        JavaPairRDD<Integer, String> ordersRDD = sc.parallelizePairs(orders, 2);

        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> cogroupRDD = userInfosRDD.cogroup(ordersRDD);

        cogroupRDD.collect().forEach(System.out::println);

        sc.stop();
    }
}

Top N 案例

package com.journey.core.rdd.transform;

import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import scala.Tuple3;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;


/**
 * Serialization stack:
 *     - object not serializable (class: java.util.ArrayList$SubList, value: [(16,26), (26,25), (1,23)])
 *     - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
 *     - object (class scala.Tuple2, (7,[(16,26), (26,25), (1,23)]))
 *     - element of array (index: 0)
 *     - array (class [Lscala.Tuple2;, size 5)
 *     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
 *     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
 *     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
 *     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
 *     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 *     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 *     at java.base/java.lang.Thread.run(Thread.java:835)
 * 23/05/09 20:29:01 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4)
 * java.io.NotSerializableException: java.util.ArrayList$SubList
 * Serialization stack:
 *
 * 解決之法 :
 * It's because, List returned by subList() method is an instance of 'RandomAccessSubList' which is not serializable.
 * Therefore you need to create a new ArrayList object from the list returned by the subList().
 */
public class Demo {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Demo")
                .setMaster("local[*]");

        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> logRDD = sc.textFile("datas/agent.log");


        JavaPairRDD<Tuple2<String, String>, Integer> proviceAdRDD = logRDD.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() {
            @Override
            public Tuple2<Tuple2<String, String>, Integer> call(String line) throws Exception {
                String[] fields = line.split(" ");
                String provice = fields[1];
                String ad = fields[4];
                return Tuple2.apply(Tuple2.apply(provice, ad), 1);
            }
        });

        JavaPairRDD<Tuple2<String, String>, Integer> proviceAdToCountRDD = proviceAdRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });


        JavaPairRDD<String, Tuple2<String, Integer>> proviceToAdCountRDD = proviceAdToCountRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> value) throws Exception {
                return Tuple2.apply(value._1._1, Tuple2.apply(value._1._2, value._2));
            }
        });

        JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> proviceToAdGroupRDD = proviceToAdCountRDD.groupByKey();

        // 在分組內進行排序,取分組內的 top N
        JavaPairRDD<String , Iterable<Tuple2<String , Integer>>> proviceToAdTop3RDD = proviceToAdGroupRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>, String, Iterable<Tuple2<String, Integer>>>() {
            @Override
            public Tuple2<String, Iterable<Tuple2<String, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> iterable) throws Exception {
                List<Tuple2<String, Integer>> result = IteratorUtils.toList(iterable._2.iterator());
                Collections.sort(result, new Comparator<Tuple2<String, Integer>>() {
                    @Override
                    public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
                        return o2._2 - o1._2;
                    }
                });
                // 一定要主要,這裏需要的是new ArrayList<>(result.subList(0, 3)),封裝一下
                return Tuple2.apply(iterable._1, new ArrayList<>(result.subList(0, 3)));
            }
        });

//        proviceToAdTop3RDD.foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>>() {
//            @Override
//            public void call(Tuple2<String, Iterable<Tuple2<String, Integer>>> stringIterableTuple2) throws Exception {
//                System.out.println(stringIterableTuple2);
//            }
//        });

        proviceToAdTop3RDD.collect().forEach(System.out::println);


        sc.stop();
    }
}

2、Action(行動算子)

reduce

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.List;

/**
 * 聚合RDD中的所有元素,先聚合分區內數據,再聚合分區間數據
 */
public class ReduceRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("ReduceRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        Integer result = numsRDD.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(result);


        sc.stop();
    }
}

collect

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.List;

/**
 * collect會將數據拉取到Driver端進行聚合,注意 : 如果數據量比較大,可能會讓Driver內存溢出
 */
public class CollectRDD {

    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf()
                .setAppName("CollectRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        // 同步獲取
//        numsRDD.collect().forEach(System.out::println);

        // 異步獲取
        JavaFutureAction<List<Integer>> jobFuture = numsRDD.collectAsync();
        while (!jobFuture.isDone()) {
            Thread.sleep(1000);  // 1 second
            List<Integer> jobIds = jobFuture.jobIds();
            if (jobIds.isEmpty()) {
                continue;
            }
            int currentJobId = jobIds.get(jobIds.size() - 1);
            SparkJobInfo jobInfo = sc.statusTracker().getJobInfo(currentJobId);
            SparkStageInfo stageInfo = sc.statusTracker().getStageInfo(jobInfo.stageIds()[0]);
            System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
                    " active, " + stageInfo.numCompletedTasks() + " complete");
        }

        if (jobFuture.isDone()) {
            List<Integer> result = jobFuture.get();
            System.out.println(result);
        }

        sc.stop();
    }
}

count

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * 統計元素的個數
 */
public class CountRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("CountRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        long count = numsRDD.count();
        System.out.println(count);

        sc.stop();
    }
}

first

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * 返回RDD中的第一個元素
 */
public class FirstRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("FirstRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        long firstItem = numsRDD.first();
        System.out.println(firstItem);

        sc.stop();
    }
}

take

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * 返回RDD的前個元素
 */
public class TakeRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("TakeRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        List<Integer> items = numsRDD.take(3);
        System.out.println(items);

        sc.stop();
    }
}

takeOrdered

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.ArrayList;
import java.util.List;

/**
 * 返回RDD排序後的前n個元素數組
 */
public class TakeOrderedRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("TakeOrderedRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(10);
        nums.add(22);
        nums.add(3);
        nums.add(40);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        // 默認升序,可以傳入比較器
        List<Integer> items = numsRDD.takeOrdered(2);
        System.out.println(items);

        sc.stop();
    }
}

aggregate

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.List;

/**
 * 分區內通過初始值計算進行聚合,然後再用初始值進行分區間數據聚合,和aggregateByKey不同,aggregateByKey只會參與分區內計算
 */
public class AggregateRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("AggregateRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(10);
        nums.add(10);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        /**
         * 分區1(分區內) : 初始值(10) + 10
         * 分區2(分區內) : 初始值(10) + 10
         *
         * 分區間 : 初始值(10) + 20 + 20
         *
         * 所以注意 : 不管是aggregateByKey還是aggregate都是和分區有關的,分區個數不同,初始值的計算也會不同
         */
        Integer sum = numsRDD.aggregate(10, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        System.out.println(sum);

        sc.stop();
    }
}

fold

ckage com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import java.util.ArrayList;
import java.util.List;

/**
 * 同aggregate類似,只是分區內和分區間邏輯需要一樣
 */
public class FoldRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("FoldRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(10);
        nums.add(10);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        Integer sum = numsRDD.fold(10, new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        System.out.println(sum);

        sc.stop();
    }
}

countByKey

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 統計key出現的次數
 */
public class CountByKeyRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("CountByKeyRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Tuple2<String, Integer>> userInfos = new ArrayList<>();
        userInfos.add(Tuple2.apply("zhangsan", 23));
        userInfos.add(Tuple2.apply("lisi", 30));

        JavaPairRDD<String, Integer> userInfosRDD = sc.parallelizePairs(userInfos, 2);

        Map<String, Long> countByKey = userInfosRDD.countByKey();
        System.out.println(countByKey);

        sc.stop();
    }
}

save

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 保存相關算子
 */
public class SaveRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("SaveRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Tuple2<String, Integer>> userInfos = new ArrayList<>();
        userInfos.add(Tuple2.apply("zhangsan", 23));
        userInfos.add(Tuple2.apply("lisi", 30));

        JavaPairRDD<String, Integer> userInfosRDD = sc.parallelizePairs(userInfos, 2);

        // 保存成text文件
        userInfosRDD.saveAsTextFile("datas/output1");
        // 序列化成對象保存到文件
        userInfosRDD.saveAsObjectFile("datas/output2");

        sc.stop();
    }
}

foreach

package com.journey.core.rdd.action;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * foreach和collect相比不一樣,collect是將數據拉取到Driver端,foreache直接在Executor進行比如輸出
 */
public class ForeachRDD {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf()
                .setAppName("ForeachRDD")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        List<Integer> nums = new ArrayList<>();
        nums.add(1);
        nums.add(2);
        nums.add(3);
        nums.add(4);

        JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);

        numsRDD.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer value) throws Exception {
                System.out.println(value);
            }
        });

        sc.stop();
    }
}

如感興趣,點贊加關注,非常感謝!!!

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.