Python tips

Algunas casos (o cachos según como se vea) y las funciones de uso poco común pero que sacan de apuros cuando realmente se necesitan. Estos tips lo iré ampliando a medida que me encuentre con esos «apuros».

Caso1. Dividir un dataframe

A partir de un dataframe que contiene un total de 18 columnas se debe generar 8 dataframes de 4 columnas cada una, en que las dos primeras columnas son iguales para todos los dataframes y las restantes a partir de las columnas originales. Sin embargo, los primeros 4 dataframes corresponden a un estado distinto de los últimas 4. Por tanto se hará en dos etapas para agregar esta columna «estado» que no está en el dataframe original.

Finalmente se uniran todos en un único dataframe.

# las dos primeras columnas son comunes a todos los dataframes, siendo df el dataframe original:
cols_comun = df.columns[:2]

# crear 8 dataframes diferentes de 4 columnas cada una, las primeras 4 corresponden a un nuevo estado 'Cotizacion' y la otras 4 a 'Venta'
# dfs1 contiene las primeras 8 columnas
# dfs2 contiene las ultimas 8 columnas

dfs1 = [pd.concat([df[common_cols], df[[df.columns[i], 
    df.columns[i+1]]]], axis=1) for i in range(2, 9, 2)]
dfs2 = [pd.concat([df[common_cols], df[[df.columns[i], 
    df.columns[i+1]]]], axis=1) for i in range(10, 17, 2)]

# cambiar el nombre de las columnas para todos los frames
for i in range(4):
    dfs1[i].columns, dfs2[i].columns = ['idCotizacion', 'idOperacion', 'Tipo', 'Porcentaje']

# unimos los primeros 4 dataframe que se corresponden al estado 'Cotizacion'
df_finalc = pd.concat(dfs1, ignore_index=True)
df_finalc.insert(2, 'Estado','Cotizacion')

# unimos los últimos 4 dataframe que se corresponden al estado 'Venta'
df_finalr = pd.concat(dfs2, ignore_index=True)
df_finalr.insert(2, 'Estado','Venta')

# finalmente unimos ambos DataFrames y eliminamos las fila nulas de la última columna
df_final = pd.concat([df_finalc, df_finalr], ignore_index=True)
df_final = df_final.dropna(subset=df_final.columns[-1:], how='all')

# exportamos a excel
df_final.to_excel('df_final.xlsx', index=False)

Caso 2. Subir el contenido de un archivo csv a una tabla de una base de datos

Tenemos un archivo de datos en formato csv (o excel) y necesitamos subir esa información en una tabla dentro de base de datos (SQL Server). El primer paso es tener creada la tabla y su estructura, aunque no siempre es necesario, ya que desde SQLAlchemy si la tabla no existe esta se crea. Aunque en este últmo caso es recomendable determinar la estructura de datos con un esquema, para asegurar consistencia en cada recreación.

import os
import io
from pathlib import Path
from urllib.parse import urlparse
import pyodbc
import sqlalchemy as sal
from sqlalchemy import text
from sqlalchemy import create_engine
import pandas as pd

# datos de conexión a la base de datos, estos debiesen estar en otro archivo protegido o como variables de sistema
tablename = "PAGOS" # nombre de la tabla destinataria

server = 'server.windows.net'
database = 'database-server-data'
username = 'admin_read'
password = '123456'

conn_str = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';
            DATABASE='+database+';AUTOCOMMIT=FALSE;
            UID='+username+';PWD='+ password
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={conn_str}", fast_executemany=True, use_insertmanyvalues=True)

# obtener los datos desde un archivo externo
# 
def cargar_datos(filename):
    if filenameis not None:
        # Leer las primeras líneas para determinar el separador
        try:
            # Leer las primeras líneas para deducir el separador
            contenido = filename.getvalue().decode("utf-8")
            lineas = contenido.split('\n')
            separador = ',' if lineas[0].find(',') > -1 else ';'

            # Usar el separador deducido para leer el DataFrame
            df = pd.read_csv(filename, sep=separador)
            return df
        except Exception as e:
            st.error(f"Error al leer el archivo: {e}")

# version corta para evaluar si el archivo tiene ',' ó ';' como separador
#
def load_data_csv(filename):
    if archivo is not None:
        try:
            data = pd.read_cvs(filename, sep=",", low_memory=False, index_col=0)
        except:
            data = pd.read_cvs(filename, sep=";", low_memory=False, index_col=0)
        return data

# probablemente haya que hacer transformaciones (para asegurarse el formato a la tabla destino)
df['FECHA'] = pd.to_datetime(df['FECHA'], format="%d/%m/%Y")
df['VENCIMIENTO'] = pd.to_datetime(df['VENCIMIENTO'], format="%d/%m/%Y")
df['UF_PACTADO'] = df['UF_PACTADO'].astype(float)
df['UF_PAGADO'] = df['UF_PAGADO'].astype(float)
df['PESO_PAGADO'] = df['PESO_PAGADO'].astype(float)

# si la tabla destino ya existe se limpia
with engine.connect() as conn:
    result = conn.execute(text("TRUNCATE TABLE "+tablename))
    conn.commit()

# carga de datos con sqlalchemy
start_time = time() # solo para determinar tiempo de carga

df.to_sql(tablename, engine, if_exists='append', index=False, method='multi')

elapsed_time = time() - start_time
print("Tiempo: %.10f seconds." % elapsed_time)

Caso 3. Homologar nombres de columnas y tipos

Desde un archivo csv o Excel, cargar la data a una tabla de base de datos (SQL Server) con los nombres de columnas corregidos y los tipos de datos aceptados por SQL Server.

# establecemos una funcion para controlar que los nombres de columnas no contengan espacios o caracteres anormales no permitidos en la base de datos.
# un segunda función para homologar los tipos de datos predeterminados de Python hacia los tipos admitidos de SQL.

define sql_column_name(dataframe):
    data.columns = [x.lower().replace(" ","_") for x in dataframe.columns]

define sql_types():
    replacepsql = {
        'object': 'varchar'
        'float64': 'float'
        'int64': 'int'
        'datetime64': 'timestamp'
        'timedelta64[ns]': 'varchar'
    }
    return replacesql

Caso 4. Cargar archivos desde un FTP hacia Azure Blob Storage

No lo he probado con muchos archivos, pero funciona,

# carga de archivos CSV sobre Azure storage
from ftplib import FTP
import os.path
import os
import logging
from azure.storage.blob import BlobClient
#from azure.storage.blob.aio import BlobClient

# por seguridad los parametros de conexion al storage y ftp debienes estar en variables de ambiente
# pensado para correr desde AZURE DATA STUDIO
# si lo corres local puedes colocar directamente las variables de contraseñas

connect_str = os.environ["AZURE_STORAGE_CONNECTION_STRING"]

def init_FTP():
    Host = os.environ["ftp_host"]
    Puerto = os.environ["ftp_port"]
    Cuenta = os.environ["ftp_cuenta"]
    Pass = os.environ["ftp_pwd"]
    connFTP = [Host, Puerto, Cuenta, Pass]
    return connFTP

# FTP
def ftp2blob():
    connFTP = init_FTP()
    Host = str(connFTP[0])
    Puerto = int(connFTP[1])
    Cuenta = str(connFTP[2])
    Pass = str(connFTP[3])

    ftp = FTP(Host)
    ftp.connect(Host, Puerto, timeout=1000)
    ftp.login(user=Cuenta, passwd=Pass)
    
    files = ftp.nlst()

    logging.info('Se inicia descarga desde FTP')

    for j in range(len(files)):
        filename = files[j]
        try:
            local_filename = os.path.join(r"C:/Users/paranedag/Downloads/temporal", filename)
            lf = open(local_filename, "wb")
            ftp.retrbinary("RETR " + filename, lf.write)
            lf.close()

            blob = BlobClient.from_connection_string(conn_str=connect_str, 
                    container_name='AZURECONTAINER', blob_name=filename)
            with open(file=local_filename, mode="rb") as file:
                blob.upload_blob(file, overwrite=True)

            logging.info("LISTA DESCARGA "+filename)
        except ValueError:
            logging.info("Error " + ValueError)
    logging.info('Término de descarga desde FTP')

# ejecutar FTP
ftp2blob()

Caso 5. Descargar de un SSH SFTP Azure

Acá es importante agregar max_concurrent_prefetch_requests=90 para evitar caídas del servidor cpn archivos grandes.

# import paramiko

def azure_download_ftp():
    host = "blob.core.windows.net"
    port = 22
    username = ""
    password = ""

    lista_archivos = ['GESTION.csv', 'PROYECTO.csv', 'SECUNDARIO.csv',
                      'POST_VENTA.csv', 'CTACTE.csv', 'OPERACION.csv']
    # la lista_archivos se puede modificar para listar todos los archivos de un directorio
    try:
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(host, port, username, password)

        # Crear una instancia de cliente SFTP
        sftp = ssh.open_sftp()

        for archivo in lista_archivos:
            localfile = os.path.join(r"/tmp/", archivo)
            try:
                sftp.get(archivo, localfile,
                         max_concurrent_prefetch_requests=90)
                print(
                    f"Archivo '{archivo}' descargado con éxito")
            except Exception as e:
                print(f"Error al descargar {archivo}: {e}")

        # Cerrar la sesión SFTP y la conexión SSH
        sftp.close()
        ssh.close()
    except Exception as e:
        print(f"Error al conectar y descargar el archivo: {e}")

Caso 6. Subir archivos hacia Azure Blob Storage como SFTP

Carga los archivos de una carpeta ‘data’ hacia Azure Blob Storage. Este está configurado como servidor SFTP.

# import paramiko
# from paramiko import SFTPClient

def azure_upload_ftp():
    host = "blob.core.windows.net"
    port = 22
    username = ""
    password = ""
    local_dir = 'data'
    remote_dir = '/'

    transport = paramiko.Transport((host, port))
    transport.connect(username=username, password=password)
    sftp = SFTPClient.from_transport(transport)

    # remote directory exists?
    try:
        sftp.chdir(remote_dir)
    except IOError:
        sftp.mkdir(remote_dir)
        sftp.chdir(remote_dir)

    for root, dirs, files in os.walk(local_dir):
        for file in files:
            local_path = os.path.join(root, file)
            relative_path = os.path.relpath(local_path, local_dir)
            remote_path = os.path.join(remote_dir, relative_path)

            remote_path_directory = os.path.dirname(remote_path)
            try:
                sftp.chdir(remote_path_directory)
            except IOError:
                sftp.makedirs(remote_path_directory)
                sftp.chdir(remote_path_directory)

            sftp.put(local_path, remote_path)
            print(f'Uploaded {local_path} to {remote_path}')

    sftp.close()
    transport.close()

Tips

Detectar variables numéricas y categóricas

# variables categoricas en el dataframe "data"
categoricas = data.select_dtypes(include='object')

# variables numericas en el dataframe "data"
numericas = data.select_dtypes(exclude='object')

# numero de columnas detectadas y cuales son
print(f'Numericas: {numericas.shape[1]}')
print(f'Variables: {numericas.columns}')

Cambiar nombre de columnas.

# basado en la posición de columnas
df = df1rename(columns={2: 'Tipo', 3: 'Porcentaje'})

# otra opción para lo mismo anterior
df.rename({df.column[0]='nuevo nombre'})

Eliminar columnas no numéricas de un dataframe.

# detectar columnas de tipo numericas
numericas = data.select_dtypes(exclude='object')
columnas = list(numericas.columns)

# crear un nuevo dataframe a partir de data con solo columnas numericas
ds = data[data.columns.intersection(columnas)]

Eliminar todas las columnas de un dataframe a excepción de algunas.

# eliminar todas las columnas menos col2 y col6
df = df[['col2', 'col6']]

# idem anterior usando loc
df = df.loc[:, ['col2', 'col6']]

Eliminar columnas de un dataframe.

# funcion general
DataFrame.drop(labels=None, axis=0, index=None, columns=None, level=None, inplace=False, errors='raise')

# remover una columna
df = df.drop(columns=['col1'], axis=1)

# remover columnas col1 y col4
df.drop(columns=['col1', 'col4'], inplace=True, axis=1)
df.drop(columns=df.columns[1:3], inplace=True, axis=1)

Eliminar columnas duplicadas.

#remover columnas duplicadas
df.T.drop_duplicates().T

Excluir columnas en un dataframe.

#excluir column1
df.loc[:, df.columns!='column1']

#excluir column1, column2, ...
df.loc[:, ~df.columns.isin(['column1', 'column2', ...])]

Eliminar filas duplicadas. drop_duplicates() elimina todos los duplicados excepto el primero.

df.drop_duplicates(subset=None, keep=’first’, inplace=False)

# elimina los duplicados basados en todas las columnas
df.drop_duplicates()

# elimina filas duplicadas basados en el contenido de las columnas col1 y col4
df.drop_duplicates(subset=['col1', 'col4'])

Parámetros:
subset: Cuales columnas a considerar para identificar los duplicados. 
Por defecto considera todas las columnas.
keep: Indica cuales duplicados mantener.
 - first: elimina todas las filas duplicadas excepto la primera.
 - last: elimina todas las filas duplicadas excepto la última.
 - False: elimina todos los duplicados.
inplace: True hace el cambio en el mismo df, False crea una copia del dataframe.

Agregar una fila al final de un dataframe.

# df y df2 deben tener el mismo numero y nombre de columnas
df = df.append(df2, ignore_index = True)