Aplanar JSON es una expresión comúnmente utilizada y entendida en el ámbito de procesamiento de datos. Sin embargo, también podrás encontrar expresiones como "desnormalizar JSON" o "convertir JSON anidado a plano" que se utilizan para describir el mismo concepto. En esencia, se refiere al proceso de convertir una estructura JSON anidada en una estructura más plana o tabular, donde los objetos anidados se descomponen en columnas en lugar de mantener su estructura jerárquica.
A continuación, te mostramos el código a utilizar para aplanar JSON:
from pyspark.sql.types import StructType
from pyspark.sql.functions import explode
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
def explodeDF(df):
for (name, dtype) in df.dtypes:
if "array" in dtype:
df = df.withColumn(name, explode(name))
return df
def df_is_flat(df):
for (_, dtype) in df.dtypes:
if ("array" in dtype) or ("struct" in dtype):
return False
return True
def flatJson(jdf):
keepGoing = True
while(keepGoing):
fields = flatten(jdf.schema)
new_fields = [item.replace(".", "_") for item in fields]
jdf = jdf.select(fields).toDF(*new_fields)
jdf = explodeDF(jdf)
if df_is_flat(jdf):
keepGoing = False
return jdf
Ahora, analicemos el código:
Con las primeras dos filas, importamos las clases StructType
de pyspark.sql.types
y la función explode
de pyspark.sql.functions
. Estas son utilizadas para trabajar con estructuras de datos y realizar la operación de "explode" en un DataFrame, respectivamente.
Seguidamente, se define una función llamada flatten
que toma un esquema y un prefijo opcional como entrada y devuelve una lista de nombres de campos aplanados, iterando a través de los campos del esquema. Si el tipo de dato del campo es una estructura StructType
, la función se llama recursivamente con el nuevo prefijo. En cambio, si no es una estructura, se agrega el nombre del campo (con el prefijo si está presente) a la lista fields
. Finalmente, se devuelve la lista con return fields
A continuación, encontramos la definición de una función llamada explodeDF
que toma un DataFrame y realiza la operación de "explotar" las columnas que son de tipo array, utilizando un bucle para iterar a través de las columnas del DataFrame y, si una columna es de tipo array
, utilizar la función explode
para "explotar" dicha columna en filas separadas.
Luego, se define una función llamada df_is_flat
que toma un DataFrame y verifica si contiene columnas de tipo array
o struct
, devolviendo False
si encuentra alguna columna de estos tipos e indicando que el DataFrame no está aplanado o True
si no encuentra tales columnas.
Terminamos con una llamada a la función flatJson
que toma un DataFrame JSON y realiza el proceso de aplanamiento, utilizando un bucle while
para repetir el proceso hasta que el DataFrame esté completamente aplanado. Respecto a este while
, detallamos a continuación el proceso que sigue:
· Obtiene los nombres de los campos aplanados llamando a la función flatten
en el esquema del DataFrame.
· Reemplaza los puntos en los nombres de los campos con guiones bajos.
· Selecciona las columnas del DataFrame con los nombres de los campos aplanados y le asigna nuevos nombres.
· Realiza la operación de "explotar" en las columnas de tipo array
llamando a la función explodeDF
.
· Verifica si el DataFrame resultante es completamente plano llamando a la función df_is_flat
.
· Después de completar el proceso descrito, se devuelve el DataFrame resultante con return jdf
.
¿Quieres conocer en detalle todos los sistemas de almacenes de datos disponibles? ¡Adelante!
En esta sección encontrarás conceptos relacionados con la certificación Microsoft Azure Data Fundamentals.