¡Ayuda al desarrollo del sitio, compartiendo el artículo con amigos!

Introducción a las transformaciones Spark

Una transformación es una función que devuelve un nuevo RDD modificando los RDD existentes. El RDD de entrada no se modifica ya que los RDD son inmutables. Spark ejecuta todas las transformaciones de forma perezosa: los resultados no se calculan de inmediato. El cálculo de las transformaciones ocurre solo cuando se realiza una determinada acción en el RDD.

Tipos de transformaciones en Spark

En términos generales, se clasifican en dos tipos:

  • Transformación estrecha: todos los datos necesarios para calcular registros en una partición residen en una partición del RDD principal. Ocurre en el caso de los siguientes métodos:

mapa(), planoMapa(), filtro(), muestra(), union() etc.

  • Transformación amplia: todos los datos necesarios para calcular registros en una partición residen en más de una partición en los RDD principales. Ocurre en el caso de los siguientes métodos:

distinct(), groupByKey(), reduceByKey(), join() , repartition() etc.

Ejemplos de transformaciones Spark

Aquí analizamos los ejemplos mencionados a continuación.

1. Transformaciones estrechas

  • map(): Esta función toma una función como parámetro y aplica esta función a cada elemento del RDD.

Código:

"val conf=nueva SparkConf().setMaster(local).setAppName(testApp)"
valor sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(10,15,50,100))
"println(La base RDD es:)
">"rdd.foreach(x=print(x+ ))
imprimirln()
>val rddNuevo=rdd.map(x=x+10)
"println(RDD después de aplicar el método MAP:)
""rddNuevo.foreach(x=>print(x+ ))

Salida:

En el método MAP anterior, sumamos cada elemento por 10 y eso se refleja en el resultado.

  • FlatMap(): Es similar al mapa pero puede generar múltiples elementos de salida correspondientes a un elemento de entrada. Por lo tanto, la función debe devolver una secuencia en lugar de un solo elemento.

Código:

"val conf=nueva SparkConf().setAppName(prueba).setMaster(local)"
val sc=nuevo SparkContext(conf)
"sc.setLogLevel(ADVERTENCIA)
""val rdd=sc.parallelize(Array(1:2:3,4:5:6))
">"val rddNuevo=rdd.flatMap(x=x.split(:))
"rddNuevo.foreach(x=>print(x+ ))

Salida:

Esta función pasada como parámetro divide cada entrada por ":" y devuelve una matriz y el método FlatMap aplana la matriz.

  • filter(): Toma una función como parámetro y devuelve todos los elementos del RDD para los que la función devuelve verdadero.

Código:

"val conf=nueva SparkConf().setMaster(local).setAppName(testApp)"
valor sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd=sc.parallelize(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(La base RDD es:)
">"rdd.foreach(x=print(x+ ))
imprimirln()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD después de aplicar el método MAP:)
""rddNuevo.foreach(x=>print(x+ ))

Salida:

En el código anterior estamos tomando cadenas que no tienen la palabra "prueba".

  • sample(): Devuelve una fracción de los datos, con o sin reemplazo, usando una semilla generadora de números aleatorios dada (aunque esto es opcional).

Código:

"val conf=nueva SparkConf().setAppName(prueba).setMaster(local)"
val sc=nuevo SparkContext(conf)
"sc.setLogLevel(ADVERTENCIA)
"val rdd=sc.parallelize(Array(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNuevo=rdd.muestra(falso,.5)
"rddNuevo.foreach(x=>print(x+ ))

Salida:

En el código anterior, obtenemos muestras aleatorias sin reemplazo.

  • union(): Devuelve la unión del RDD fuente y el RDD pasado como parámetro.

Código:

"val conf=nueva SparkConf().setAppName(prueba).setMaster(local)"
val sc=nuevo SparkContext(conf)
"sc.setLogLevel(ADVERTENCIA)
"val rdd=sc.parallelize(Array(1,2,3,4,5))
val rdd2=sc.parallelize(Array(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))

Salida:

El RDD rddUnion resultante contiene todos los elementos de rdd y rdd2.

2. Transformaciones amplias

  • distinct(): Este método devuelve los distintos elementos del RDD.

Código:

"val conf=nueva SparkConf().setMaster(local).setAppName(testApp)"
valor sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelize(Array(1,1,3,4,5,5,5))
"println(La base RDD es:)
">"rdd.foreach(x=print(x+ ))
imprimirln()
val rddNuevo=rdd.distinct()
"println(RDD después de aplicar el método MAP:)
""rddNuevo.foreach(x=>print(x+ ))

Salida:

estamos obteniendo los distintos elementos 4,1,3,5 en la salida.

  • groupByKey(): Esta función es aplicable a RDD por pares. Un RDD por pares es aquel en el que cada elemento es una tupla donde el primer elemento es la clave y el segundo elemento es el valor. Esta función agrupa todos los valores correspondientes a una tecla.

Código:

"val conf=nueva SparkConf().setAppName(prueba).setMaster(local)"
val sc=nuevo SparkContext(conf)
"sc.setLogLevel(ADVERTENCIA)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNuevo=rdd.groupByKey()
"rddNuevo.foreach(x=>print(x+ ))

Salida:

Como era de esperar, todos los valores de las teclas "a" y "b" están agrupados.

  • reduceByKey(): Esta operación también se aplica a los RDD por pares. Agrega los valores para cada clave de acuerdo con un método reducido proporcionado que debe ser del tipo (v,v)=v.

Código:

"val conf=nueva SparkConf().setAppName(prueba).setMaster(local)"
val sc=nuevo SparkContext(conf)
"sc.setLogLevel(ADVERTENCIA)
""val rdd=sc.parallelize(Array((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNuevo=rdd.reduceByKey((x,y)=x+y )
"rddNuevo.foreach(x=>print(x+ ))

Salida:

En el caso anterior, sumamos todos los valores de una clave.

  • join(): La operación de unión se aplica a RDD por pares. El método de unión combina dos conjuntos de datos en función de la clave.

Código:

"val conf=nueva SparkConf().setMaster(local).setAppName(testApp)"
valor sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd1=sc.parallelize(Array((key1,10),(key2,15),(key3,100)))
""val rdd2=sc.parallelize(Array((key2,11),(key2,20),(key1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD después de unirse:)
""rddJoined.foreach(x=>print(x+ ))

Salida:

  • repartition(): Reorganiza aleatoriamente los datos en el RDD en número de particiones pasadas como parámetro. Puede tanto aumentar como disminuir las particiones.

Código:

"val conf=nueva SparkConf().setAppName(prueba).setMaster(local)"
val sc=nuevo SparkContext(conf)
"sc.setLogLevel(ADVERTENCIA)
"val rdd=sc.parallelize(Array(1,2,3,4,5,10,15,18,243,50),10)
"println(Particiones antes: +rdd.getNumPartitions)
"val rddNuevo=rdd.repartición(15)
"println(Particiones después de: +rddNew.getNumPartitions)"

Salida:

En el caso anterior, estamos aumentando las particiones de 10 a 15.

¡Ayuda al desarrollo del sitio, compartiendo el artículo con amigos!