Untitled

 avatar
unknown
plain_text
a month ago
4.9 kB
4
Indexable
5

BIGDATA:SPARK

EXPERIMENT-10:

Pyspark-RDD'S

(1) what is RDD's?

(ii) ways to Create RDD

(i) parallelized collections

(ii) external dataset

(iii) existing RDD's

(iv) Spark RDD's operations

(Count, foreach(), Collect, join, Cache()

What are RDDs?

RDDs (Resilient Distributed Datasets) are the fundamental data structure in Apache Spark. They are a collection of elements that can be split across multiple nodes in the cluster for parallel processing

Ways to Create RDDs

There are three main ways to create RDDs

i). Parallelized Collections

You can create an RDD from a Python list or other iterable using the parallelize() method

data [1, 2, 3, 4, 5) rdd sc.parallelize(data)

ii). External Dataset

You can create an RDD from an external dataset, such as a text file, using the textFile() method.

rdd sc.textFile("data.txt")

iii). Existing RDDS

You can create a new RDD from an existing RDD using various transformation operations, such as map() filter(), and flatMap().

iv). Spark RDD Operations

Here are some common RDD operations:

1. Count

Returns the number of elements in the RDD.

rdd sc.parallelize([1, 2, 3, 4, 51) countrdd.count() print(count) Output: 5

2. Foreach()

Applies a function to each element in the RDD.

rdd sc parallelize([1, 2, 3, 4, 5]) rdd foreachilambda x: print(x))
BIGDATA:SPARK

3. Collect

Returns all elements in the RDD as a Python list.

rdd sc parallelize([1, 2, 3, 4, 5])

datardd.collect()

print(data) Output: (1, 2, 3, 4, 5]

4. Join

Joins two RDDs based on a common key.

rdd1 sc.parallelize([(1, "John"), (2, "Jane")])

rdd2 sc parallelize(((1, 25), (2, 30)])

joined_rdd rdd1.join(rdd2) print(joined_rdd.collect()) # Output: ((1, ("John", 25)), (2, ("Jane", 30)))]

5. Cache

Caches the RDD in memory for faster access.

rdd = sc.parallelize([1, 2, 3, 4, 5])

rdd.cache()))
1])]
BIGDATA:SPARK

EXPERIMENT-11:

Perform pyspark transformations

(1) map and flatMap

(ii) to remove the words, which are not necessary to analyze this text.

() groupBy

(iv) What if we want to calculate how many times each word is coming in corpus ?

(v) How do I perform a task (say count the words 'spark' and 'apache' in rdd3) separatly on each partition and get the output of the task performed in these partition?

(vi) unions of RDD

(vii) join two pairs of RDD Based upon their key

(0) Map and FlatMap

map(): Applies a function to each element in the RDD and returns a new RDD with the results

rdd sc parallelize([1, 2, 3, 4, 5])

mapped rdd rdd.map(lambda x: x*2)

print(mapped_rdd.collect()) # Output: [2, 4, 6, 8, 10]

flatMap(): Similar to map(), but each input element can be mapped to zero or more output

elements.

rdd sc.parallelize(["hello world", "spark apache"])

flat mapped_rdd rdd.flatMap(lambda xxsplit())

print(flat_mapped_rdd.collect()) # Output: ['hello', 'world', 'spark', 'apache']

(ii) Removing Unnecessary Words

To remove unnecessary words, you can use the filter() transformation.

rdd sc parallelize(["hello world", "spark apache", "the quick brown fox"])

filtered rdd rdd filter(lambda xx not in ("the", "quick", "brown", "fox"]) print(filtered_rdd.collect()) # Output: ['hello world', 'spark apache')

(iii) GroupBy

The groupBy() transformation groups the elements of the RDD by a key function

rdd sc.parallelize([("spark", 1), ("apache", 2), ("spark", 3)])

grouped rdd rdd groupBy(lambda x: x[0]) print(grouped.rdd.collect()) # Output: ((spark, [<tuple>, <tuple>), ('apache. (<tuple>]]))]))])
BIGDATA SPARK

(iv) Counting Word Frequencies

To count the frequency of each word, you can use the map(), reduceByKey), and sortBy() transformations

rdd sc. parallelize(["hello world", "spark apache", "hello world")

word rdd rdd flatMap(lambda x: xsplit()).map(lambda x (x. 1))

word_count rdd word_rdd reduceByKey(lambda xух+у)

sorted_word_count_rdd = word_count_rdd sortBy(lambda xx/11, ascending=False)

print(sorted_word_count_rdd.collect()) Output (('hello', 2), ('world', 2), (spark, 1), (apache. 1)1

(v) Performing Tasks on Each Partition

To perform a task on each partition, you can use the mapPartitions() transformation

rdd = sc.parallelize((1, 2, 3, 4, 5], 2)

def process partition(partition):

Perform task on partition

return [sum(partition)]

result rdd rdd.mapPartitions(process_partition)

print(result_rdd.collect()) # Output: 13. 9]

(vi) Union of RDDs

The union() transformation returns a new RDD containing all elements from the original RDDS

rdd1 sc parallelize([1, 2, 3]) sc.p rdd2= sc.parallelize([4, 5, 6]) union_rdd=rdd1.union(rdd2) print(union_rdd.collect()) # Output: (1, 2, 3, 4, 5, 6]

(vii) Joining Two Pairs of RDDs

The join() transformation returns a new RDD containing all pairs of elements with matching keys

rdd1= sc.parallelize([("spark", 1), ("apache", 2)])

rdd2 sc parallelize([("spark", 3), ("hadoop", 4)]) joined_rdd rdd1.join(rdd2) print(joined.rdd.collect()) # Output: [('spark', (1, 3)))])))])
Editor is loading...
Leave a Comment