Untitled
unknown
plain_text
a month ago
4.9 kB
4
Indexable
5 BIGDATA:SPARK EXPERIMENT-10: Pyspark-RDD'S (1) what is RDD's? (ii) ways to Create RDD (i) parallelized collections (ii) external dataset (iii) existing RDD's (iv) Spark RDD's operations (Count, foreach(), Collect, join, Cache() What are RDDs? RDDs (Resilient Distributed Datasets) are the fundamental data structure in Apache Spark. They are a collection of elements that can be split across multiple nodes in the cluster for parallel processing Ways to Create RDDs There are three main ways to create RDDs i). Parallelized Collections You can create an RDD from a Python list or other iterable using the parallelize() method data [1, 2, 3, 4, 5) rdd sc.parallelize(data) ii). External Dataset You can create an RDD from an external dataset, such as a text file, using the textFile() method. rdd sc.textFile("data.txt") iii). Existing RDDS You can create a new RDD from an existing RDD using various transformation operations, such as map() filter(), and flatMap(). iv). Spark RDD Operations Here are some common RDD operations: 1. Count Returns the number of elements in the RDD. rdd sc.parallelize([1, 2, 3, 4, 51) countrdd.count() print(count) Output: 5 2. Foreach() Applies a function to each element in the RDD. rdd sc parallelize([1, 2, 3, 4, 5]) rdd foreachilambda x: print(x)) BIGDATA:SPARK 3. Collect Returns all elements in the RDD as a Python list. rdd sc parallelize([1, 2, 3, 4, 5]) datardd.collect() print(data) Output: (1, 2, 3, 4, 5] 4. Join Joins two RDDs based on a common key. rdd1 sc.parallelize([(1, "John"), (2, "Jane")]) rdd2 sc parallelize(((1, 25), (2, 30)]) joined_rdd rdd1.join(rdd2) print(joined_rdd.collect()) # Output: ((1, ("John", 25)), (2, ("Jane", 30)))] 5. Cache Caches the RDD in memory for faster access. rdd = sc.parallelize([1, 2, 3, 4, 5]) rdd.cache())) 1])] BIGDATA:SPARK EXPERIMENT-11: Perform pyspark transformations (1) map and flatMap (ii) to remove the words, which are not necessary to analyze this text. () groupBy (iv) What if we want to calculate how many times each word is coming in corpus ? (v) How do I perform a task (say count the words 'spark' and 'apache' in rdd3) separatly on each partition and get the output of the task performed in these partition? (vi) unions of RDD (vii) join two pairs of RDD Based upon their key (0) Map and FlatMap map(): Applies a function to each element in the RDD and returns a new RDD with the results rdd sc parallelize([1, 2, 3, 4, 5]) mapped rdd rdd.map(lambda x: x*2) print(mapped_rdd.collect()) # Output: [2, 4, 6, 8, 10] flatMap(): Similar to map(), but each input element can be mapped to zero or more output elements. rdd sc.parallelize(["hello world", "spark apache"]) flat mapped_rdd rdd.flatMap(lambda xxsplit()) print(flat_mapped_rdd.collect()) # Output: ['hello', 'world', 'spark', 'apache'] (ii) Removing Unnecessary Words To remove unnecessary words, you can use the filter() transformation. rdd sc parallelize(["hello world", "spark apache", "the quick brown fox"]) filtered rdd rdd filter(lambda xx not in ("the", "quick", "brown", "fox"]) print(filtered_rdd.collect()) # Output: ['hello world', 'spark apache') (iii) GroupBy The groupBy() transformation groups the elements of the RDD by a key function rdd sc.parallelize([("spark", 1), ("apache", 2), ("spark", 3)]) grouped rdd rdd groupBy(lambda x: x[0]) print(grouped.rdd.collect()) # Output: ((spark, [<tuple>, <tuple>), ('apache. (<tuple>]]))]))]) BIGDATA SPARK (iv) Counting Word Frequencies To count the frequency of each word, you can use the map(), reduceByKey), and sortBy() transformations rdd sc. parallelize(["hello world", "spark apache", "hello world") word rdd rdd flatMap(lambda x: xsplit()).map(lambda x (x. 1)) word_count rdd word_rdd reduceByKey(lambda xух+у) sorted_word_count_rdd = word_count_rdd sortBy(lambda xx/11, ascending=False) print(sorted_word_count_rdd.collect()) Output (('hello', 2), ('world', 2), (spark, 1), (apache. 1)1 (v) Performing Tasks on Each Partition To perform a task on each partition, you can use the mapPartitions() transformation rdd = sc.parallelize((1, 2, 3, 4, 5], 2) def process partition(partition): Perform task on partition return [sum(partition)] result rdd rdd.mapPartitions(process_partition) print(result_rdd.collect()) # Output: 13. 9] (vi) Union of RDDs The union() transformation returns a new RDD containing all elements from the original RDDS rdd1 sc parallelize([1, 2, 3]) sc.p rdd2= sc.parallelize([4, 5, 6]) union_rdd=rdd1.union(rdd2) print(union_rdd.collect()) # Output: (1, 2, 3, 4, 5, 6] (vii) Joining Two Pairs of RDDs The join() transformation returns a new RDD containing all pairs of elements with matching keys rdd1= sc.parallelize([("spark", 1), ("apache", 2)]) rdd2 sc parallelize([("spark", 3), ("hadoop", 4)]) joined_rdd rdd1.join(rdd2) print(joined.rdd.collect()) # Output: [('spark', (1, 3)))])))])
Editor is loading...
Leave a Comment