# import packs import numpy as np import pandas as pd import scanpy as sc import gc import torch import torch.nn.functional as F import umap import anndata from sklearn.preprocessing import StandardScaler from sklearn.metrics.cluster import adjusted_rand_score from sklearn.metrics import normalized_mutual_info_score from sklearn.feature_extraction.text import TfidfTransformer from scipy import io from scipy import sparse import matplotlib.pyplot as plt ### FUNCTION ### ################ def transcriptomicAnalysis( path_10x, bcode_variants, high_var_genes_min_mean=0.015, high_var_genes_max_mean=3, high_var_genes_min_disp=0.75, min_genes=200, min_cells=3, max_pct_mt=100, n_neighbors=20, n_pcs=30, # se None → niente PCA transcript_key='trans', manual_matrix_reading=False ): """ Preprocess transcriptomic (RNA) data from 10x Genomics format. If n_pcs=None, skip PCA and use full (normalized) expression matrix as embedding. """ import scanpy as sc import anndata import pandas as pd import numpy as np import scipy.sparse as sp from scipy import io, sparse from sklearn.preprocessing import StandardScaler print("[INFO] === TRANSCRIPTOMIC ANALYSIS START ===") # === Lettura dati RNA === if manual_matrix_reading: rna = path_10x + '/matrix.mtx.gz' barcode = path_10x + '/barcodes.tsv.gz' feat = path_10x + '/features.tsv.gz' rna = io.mmread(rna) B = rna.todense().T barcode = pd.read_csv(barcode, sep='\t', header=None, names=['bcode']) barcode.index = barcode.iloc[:, 0] barcode = barcode.drop('bcode', axis=1) feat = pd.read_csv(feat, sep='\t', header=None, names=['gene_ids', 'gene_symbol', 'feature_types']) feat.index = feat['gene_symbol'] feat = feat.drop('gene_symbol', axis=1) adata = anndata.AnnData(X=B, obs=barcode, var=feat) adata.X = sparse.csr_matrix(adata.X) else: adata = sc.read_10x_mtx(path_10x, var_names='gene_symbols', cache=True) # === Filtra barcode presenti anche nel file varianti === bcode_var = pd.read_csv(bcode_variants, sep='\t', header=None)[0] adata = adata[adata.obs.index.isin(bcode_var)] # === Filtraggio e QC === sc.pp.filter_cells(adata, min_genes=min_genes) sc.pp.filter_genes(adata, min_cells=min_cells) adata.var['mt'] = adata.var_names.str.startswith('MT-') sc.pp.calculate_qc_metrics(adata, qc_vars=['mt'], percent_top=None, log1p=False, inplace=True) adata = adata[adata.obs.pct_counts_mt < max_pct_mt, :] # === Normalizzazione e log === sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) # ~_~T FIX: rimuove eventuali NaN generati da log1p if sp.issparse(adata.X): adata.X.data[np.isnan(adata.X.data)] = 0 else: adata.X = np.nan_to_num(adata.X, nan=0.0, posinf=0.0, neginf=0.0) # === Selezione geni altamente variabili === sc.pp.highly_variable_genes( adata, min_mean=high_var_genes_min_mean, max_mean=high_var_genes_max_mean, min_disp=high_var_genes_min_disp ) # === Salva matrice originale === adata.layers["trans_raw"] = adata.X.copy() # === PCA opzionale === if n_pcs is not None: print(f"[INFO] PCA con {n_pcs} componenti...") sc.pp.pca(adata, n_comps=n_pcs, use_highly_variable=True) sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=n_pcs, random_state=42) sc.tl.umap(adata, random_state=42) adata.obsm[transcript_key + '_umap'] = adata.obsm['X_umap'] embedding = adata.obsm['X_pca'] else: print("[INFO] PCA disabilitata → uso matrice intera (tutte le feature normalizzate).") embedding = adata.X.toarray() if sp.issparse(adata.X) else adata.X # === Scaling per uniformare range numerico === scaler = StandardScaler(with_mean=False) # evita densificazione su sparse adata.uns[transcript_key + '_X'] = scaler.fit_transform(embedding) print(f"[INFO] Salvato embedding → adata.uns['{transcript_key}_X'] shape={adata.uns[transcript_key + '_X'].shape}") print("[INFO] === TRANSCRIPTOMIC ANALYSIS DONE ===") return adata def _prep_embedding_for_training(X): """ Z-score per feature, poi L2 per cella. Restituisce float32 numpy. """ import numpy as np from sklearn.preprocessing import StandardScaler, normalize X = np.asarray(X, dtype=np.float32) X = StandardScaler(with_mean=True, with_std=True).fit_transform(X) X = normalize(X, norm="l2", axis=1) return X class PairedAE(torch.nn.Module): """ Due encoder (RNA/VAR) e due decoder con bottleneck condiviso (latent_dim). Loss: ricostruzione + allineamento (cosine) tra z_rna e z_var. """ def __init__(self, dim_a, dim_b, latent_dim=32, hidden=256): super().__init__() self.encoder_a = torch.nn.Sequential( torch.nn.Linear(dim_a, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim), ) self.encoder_b = torch.nn.Sequential( torch.nn.Linear(dim_b, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim), ) self.decoder_a = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, dim_a), ) self.decoder_b = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, dim_b), ) def forward(self, xa, xb): za = self.encoder_a(xa) zb = self.encoder_b(xb) ra = self.decoder_a(za) rb = self.decoder_b(zb) return za, zb, ra, rb def train_paired_ae( Xa, Xb, latent_dim=32, hidden=256, batch_size=256, num_epoch=300, lr=1e-3, lam_align=0.5, # forza allineamento lam_recon_a=1.0, # ricostruzione RNA lam_recon_b=3.0, # ricostruzione VAR lam_cross=2.0, # nuova: RNA→VAR consistency verbose=True, ): """ Paired Autoencoder asimmetrico: RNA = teacher, VAR = student. Loss = recA + recB + cross(A→B) + cosine alignment. """ import torch, torch.nn.functional as F, numpy as np Xa = _prep_embedding_for_training(Xa) Xb = _prep_embedding_for_training(Xb) Xa, Xb = np.asarray(Xa, np.float32), np.asarray(Xb, np.float32) n, da = Xa.shape _, db = Xb.shape class AsymAE(torch.nn.Module): def __init__(self, da, db, latent_dim, hidden): super().__init__() self.enc_a = torch.nn.Sequential( torch.nn.Linear(da, hidden), torch.nn.ReLU(), torch.nn.Dropout(0.1), torch.nn.Linear(hidden, latent_dim) ) self.enc_b = torch.nn.Sequential( torch.nn.Linear(db, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim) ) self.dec_a = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, da) ) self.dec_b = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, db) ) def forward(self, xa, xb): za = self.enc_a(xa) zb = self.enc_b(xb) ra = self.dec_a(za) rb = self.dec_b(zb) return za, zb, ra, rb device = torch.device("cpu") model = AsymAE(da, db, latent_dim, hidden).to(device) opt = torch.optim.Adam(model.parameters(), lr=lr) cos = torch.nn.CosineEmbeddingLoss(margin=0.0) Xa_t, Xb_t = torch.tensor(Xa), torch.tensor(Xb) for ep in range(1, num_epoch + 1): perm = torch.randperm(n) total = 0 for i in range(0, n, batch_size): idx = perm[i:i+batch_size] xa, xb = Xa_t[idx].to(device), Xb_t[idx].to(device) za, zb, ra, rb = model(xa, xb) rec_a = F.mse_loss(ra, xa) rec_b = F.mse_loss(rb, xb) cross = F.mse_loss(zb, za.detach()) # VAR deve seguire RNA y = torch.ones(za.shape[0], device=device) align = cos(za, zb, y) loss = lam_recon_a*rec_a + lam_recon_b*rec_b + lam_cross*cross + lam_align*align opt.zero_grad() loss.backward() opt.step() total += loss.item() if verbose and (ep % max(1, num_epoch//10) == 0 or ep==1): print(f"[AE] Epoch {ep:04d}/{num_epoch} | loss={total:.3f} | recA={rec_a:.3f} recB={rec_b:.3f} cross={cross:.3f}") with torch.no_grad(): zA, zB, _, _ = model(Xa_t, Xb_t) zA = F.normalize(zA, dim=1).cpu().numpy() zB = F.normalize(zB, dim=1).cpu().numpy() return model, zA, zB def variantAnalysis( adata, matrix_path=None, bcode_path=None, variants_path=None, min_cells=10, min_counts=20, variant_filter_level="Norm", n_pcs=50, # PCA facoltativa per ridurre rumore variant_rep="tfidf", # 'tfidf' | 'binary' | 'lognorm' ): """ Prepara la vista VAR a partire da adata.X (counts/binari). Applica filtro opzionale, rappresentazione scelta, scaling e PCA (facoltativa). Salva l'embedding in adata.uns['variant_X'] e genera variant_umap per QC. """ import scanpy as sc import numpy as np from scipy import sparse from sklearn.preprocessing import StandardScaler from sklearn.feature_extraction.text import TfidfTransformer print("[INFO] === VARIANT ANALYSIS START ===") print(f"[INFO] Filtraggio: {variant_filter_level} | Rappresentazione: {variant_rep} | PCA: {n_pcs}") X = adata.X.copy() n_total = X.shape[1] # --- soglie filtro --- if variant_filter_level.lower() == "none": min_cells_eff, min_counts_eff = 0, 0 elif variant_filter_level.lower() == "low": min_cells_eff = max(1, int(min_cells * 0.3)) min_counts_eff = max(1, int(min_counts * 0.3)) else: # "Norm" min_cells_eff, min_counts_eff = min_cells, min_counts # --- filtro per varianti rare/deboli --- if sparse.issparse(X): counts_per_var = np.asarray(X.sum(axis=0)).ravel() cells_per_var = np.asarray((X > 0).sum(axis=0)).ravel() else: counts_per_var = X.sum(axis=0) cells_per_var = (X > 0).sum(axis=0) mask = (cells_per_var >= min_cells_eff) & (counts_per_var >= min_counts_eff) removed = int((~mask).sum()) if removed > 0: adata = adata[:, mask].copy() X = adata.X print(f"[INFO] Varianti filtrate: {removed}/{n_total} → kept {adata.n_vars}") # --- rappresentazione varianti --- if variant_rep == "binary": if sparse.issparse(X): X_rep = (X > 0).astype("float32") else: X_rep = (X > 0).astype("float32") elif variant_rep == "lognorm": # counts → log1p(normalized) if sparse.issparse(X): lib = np.asarray(X.sum(axis=1)).ravel() lib[lib == 0] = 1.0 X_norm = X.multiply(1.0 / lib[:, None]) X_rep = X_norm else: lib = X.sum(axis=1, keepdims=True) lib[lib == 0] = 1.0 X_rep = X / lib # log1p if sparse.issparse(X_rep): X_rep = X_rep.tocsr(copy=True) X_rep.data = np.log1p(X_rep.data) else: X_rep = np.log1p(X_rep) else: # "tfidf" (default, consigliato per binario/sparse) tfidf = TfidfTransformer(norm=None, use_idf=True, smooth_idf=True, sublinear_tf=False) if sparse.issparse(X): X_rep = tfidf.fit_transform(X) else: X_rep = tfidf.fit_transform(sparse.csr_matrix(X)) # normalizza per cella (L2) per comparabilità from sklearn.preprocessing import normalize X_rep = normalize(X_rep, norm="l2", axis=1) # --- scaling & PCA (facoltativa) --- if n_pcs is not None: sc.pp.scale(adata, zero_center=False) # solo per settare .var e compatibilità # PCA con arpack non accetta direttamente sparse: convertiamo from sklearn.decomposition import PCA, TruncatedSVD if sparse.issparse(X_rep): # SVD tronca più stabile sullo sparse svd = TruncatedSVD(n_components=n_pcs, random_state=42) embedding = svd.fit_transform(X_rep) else: pca = PCA(n_components=n_pcs, random_state=42) embedding = pca.fit_transform(X_rep) else: embedding = X_rep.toarray() if sparse.issparse(X_rep) else X_rep # --- salva embedding per integrazione + QC UMAP --- from sklearn.preprocessing import StandardScaler adata.uns["variant_X"] = StandardScaler(with_mean=True, with_std=True).fit_transform( np.asarray(embedding, dtype=np.float32) ) # QC UMAP (2D) solo per ispezione sc.pp.neighbors(adata, use_rep=None, n_neighbors=15, n_pcs=None) # non usato per variant_X import umap emb2d = umap.UMAP(n_neighbors=10, min_dist=0.05, random_state=42).fit_transform(adata.uns["variant_X"]) adata.obsm["variant_umap"] = emb2d print(f"[INFO] Salvato adata.uns['variant_X'] shape={adata.uns['variant_X'].shape}") print("[INFO] === VARIANT ANALYSIS DONE ===") return adata def calcOmicsClusters(adata, omic_key, res=0.5, n_neighbors=None, n_pcs=None): """ Compute clustering (Leiden) on a given omic embedding (trans, variant, int). Automatically adapts to 2D UMAPs or high-dimensional embeddings. """ import scanpy as sc import numpy as np # Default n_neighbors if n_neighbors is None: n_neighbors = 15 # --- Detect representation type --- use_rep = omic_key + "_umap" if use_rep in adata.obsm: # UMAP is 2D -> skip n_pcs print(f"[INFO] Using 2D UMAP embedding: {use_rep}") sc.pp.neighbors(adata, n_neighbors=n_neighbors, use_rep=use_rep, key_added=f"{omic_key}_neighbors") elif omic_key + "_X" in adata.uns: # High-dimensional representation saved in uns X = adata.uns[omic_key + "_X"] adata.obsm[omic_key + "_X"] = X dims = X.shape[1] print(f"[INFO] Using high-dimensional embedding: {omic_key}_X ({dims} dims)") sc.pp.neighbors( adata, n_neighbors=n_neighbors, n_pcs=min(n_pcs or dims, dims), use_rep=omic_key + "_X", key_added=f"{omic_key}_neighbors" ) else: raise ValueError(f"No valid representation found for omic_key='{omic_key}'") # --- Leiden clustering --- sc.tl.leiden( adata, resolution=res, key_added=f"{omic_key}_clust_{res}", neighbors_key=f"{omic_key}_neighbors" ) return adata def weightsInit(m): if isinstance(m, torch.nn.Linear): torch.nn.init.xavier_uniform(m.weight.data) m.bias.data.zero_() def omicsIntegration( adata, transcript_key='trans', variant_key='variant', integration_key='int', latent_dim=32, num_epoch=300, lam_align=0.5, lam_recon_a=1.0, lam_recon_b=1.0, lam_cross=2.0, seed=42, ): """ Integrazione RNA+VAR con autoencoder asimmetrico (RNA=teacher, VAR=student) + confronto CCA opzionale, sceglie automaticamente embedding migliore. """ import numpy as np from sklearn.metrics.pairwise import cosine_similarity from sklearn.cross_decomposition import CCA from sklearn.preprocessing import StandardScaler import umap, scanpy as sc print("[INFO] === OMICS INTEGRATION (Asymmetric AE + CCA) START ===") assert transcript_key + "_X" in adata.uns, f"Missing adata.uns['{transcript_key}_X']" assert variant_key + "_X" in adata.uns, f"Missing adata.uns['{variant_key}_X']" X_a = np.asarray(adata.uns[transcript_key + "_X"], dtype=np.float32) X_b = np.asarray(adata.uns[variant_key + "_X"], dtype=np.float32) assert X_a.shape[0] == X_b.shape[0], "RNA/VAR rows (cells) must match" # --- Bilancia varianze globali --- X_a /= np.std(X_a) X_b /= np.std(X_b) # --- 1️⃣ Autoencoder asimmetrico --- _, zA, zB = train_paired_ae( X_a, X_b, latent_dim=latent_dim, num_epoch=num_epoch, lam_align=lam_align, lam_recon_a=lam_recon_a, lam_recon_b=lam_recon_b, lam_cross=lam_cross, verbose=True, ) simAE = float(np.mean(np.sum(zA * zB, axis=1))) zAE = 0.5 * (zA + zB) # --- 2️⃣ CCA LINEARE --- Xa_s = StandardScaler(with_mean=True, with_std=True).fit_transform(X_a) Xb_s = StandardScaler(with_mean=True, with_std=True).fit_transform(X_b) cca = CCA(n_components=min(latent_dim, Xa_s.shape[1], Xb_s.shape[1])) Za, Zb = cca.fit_transform(Xa_s, Xb_s) Za = (Za / np.linalg.norm(Za, axis=1, keepdims=True).clip(1e-9)).astype(np.float32) Zb = (Zb / np.linalg.norm(Zb, axis=1, keepdims=True).clip(1e-9)).astype(np.float32) simCCA = float(np.mean(np.sum(Za * Zb, axis=1))) zCCA = 0.5 * (Za + Zb) # --- 3️⃣ Scelta embedding migliore --- if simCCA >= simAE: chosen, z, sim = "CCA", zCCA, simCCA else: chosen, z, sim = "AE", zAE, simAE # --- 4️⃣ Salva embedding e metriche --- adata.uns[integration_key + "_X"] = z.astype(np.float32) adata.uns[integration_key + "_metrics"] = { "simAE": simAE, "simCCA": simCCA, "chosen": chosen, "latent_dim": int(latent_dim), "num_epoch": int(num_epoch), "lam_align": float(lam_align), "lam_recon_b": float(lam_recon_b), "lam_cross": float(lam_cross), } # --- 5️⃣ Calcola UMAP finale --- um = umap.UMAP(n_neighbors=15, min_dist=0.05, random_state=seed).fit_transform(z) adata.obsm[integration_key + "_umap"] = um print(f"[INT] AE={simAE:.3f} | CCA={simCCA:.3f} → chosen: {chosen}") print("[INFO] === OMICS INTEGRATION DONE ===") return adata class pairedIntegration(torch.nn.Module): def __init__(self,input_dim_a=2000,input_dim_b=2000,clf_out=10): super(pairedIntegration, self).__init__() self.input_dim_a = input_dim_a self.input_dim_b = input_dim_b self.clf_out = clf_out self.encoder_a = torch.nn.Sequential( torch.nn.Linear(self.input_dim_a, 1000), torch.nn.BatchNorm1d(1000), torch.nn.ReLU(), torch.nn.Linear(1000, 512), torch.nn.BatchNorm1d(512), torch.nn.ReLU(), torch.nn.Linear(512, 128), torch.nn.BatchNorm1d(128), torch.nn.ReLU()) self.encoder_b = torch.nn.Sequential( torch.nn.Linear(self.input_dim_b, 1000), torch.nn.BatchNorm1d(1000), torch.nn.ReLU(), torch.nn.Linear(1000, 512), torch.nn.BatchNorm1d(512), torch.nn.ReLU(), torch.nn.Linear(512, 128), torch.nn.BatchNorm1d(128), torch.nn.ReLU()) self.clf = torch.nn.Sequential( torch.nn.Linear(128, self.clf_out), torch.nn.Softmax(dim=1)) self.feature = torch.nn.Sequential( torch.nn.Linear(128, 32)) def forward(self, x_a,x_b): out_a = self.encoder_a(x_a) f_a = self.feature(out_a) y_a = self.clf(out_a) out_b = self.encoder_b(x_b) f_b = self.feature(out_b) y_b = self.clf(out_b) return f_a,y_a,f_b,y_b def pairedIntegrationTrainer(X_a, X_b, model, batch_size = 512, num_epoch=5, f_temp = 0.1, p_temp = 1.0): device = torch.device("cpu") f_con = contrastiveLoss(batch_size = batch_size,temperature = f_temp) p_con = contrastiveLoss(batch_size = model.clf_out,temperature = p_temp) opt = torch.optim.SGD(model.parameters(),lr=0.01, momentum=0.9,weight_decay=5e-4) for k in range(num_epoch): model.to(device) n = X_a.shape[0] r = np.random.permutation(n) X_train_a = X_a[r,:] X_tensor_A=torch.tensor(X_train_a).float() X_train_b = X_b[r,:] X_tensor_B=torch.tensor(X_train_b).float() losses = 0 for j in range(n//batch_size): inputs_a = X_tensor_A[j*batch_size:(j+1)*batch_size,:].to(device) inputs_a2 = inputs_a + torch.normal(0,1,inputs_a.shape).to(device) inputs_a = inputs_a + torch.normal(0,1,inputs_a.shape).to(device) inputs_b = X_tensor_B[j*batch_size:(j+1)*batch_size,:].to(device) inputs_b = inputs_b + torch.normal(0,1,inputs_b.shape).to(device) feas,o,nfeas,no = model(inputs_a,inputs_b) feas2,o2,_,_ = model(inputs_a2,inputs_b) fea_mi = f_con(feas,nfeas)+f_con(feas,feas2) p_mi = p_con(o.T,no.T)+p_con(o.T,o2.T) loss = fea_mi + p_mi opt.zero_grad() loss.backward() opt.step() losses += loss.data.tolist() print("Total loss: "+str(round(losses,4))) gc.collect() class contrastiveLoss(torch.nn.Module): def __init__(self, batch_size, temperature=0.5): super().__init__() self.batch_size = batch_size self.register_buffer("temperature", torch.tensor(temperature)) self.register_buffer("negatives_mask", (~torch.eye(batch_size * 2, batch_size * 2, dtype=bool)).float()) def forward(self, emb_i, emb_j): # """ # emb_i and emb_j are batches of embeddings, where corresponding indices are pairs # z_i, z_j as per SimCLR paper # """ z_i = F.normalize(emb_i, dim=1,p=2) z_j = F.normalize(emb_j, dim=1,p=2) representations = torch.cat([z_i, z_j], dim=0) similarity_matrix = F.cosine_similarity(representations.unsqueeze(1), representations.unsqueeze(0), dim=2) sim_ij = torch.diag(similarity_matrix, self.batch_size) sim_ji = torch.diag(similarity_matrix, -self.batch_size) positives = torch.cat([sim_ij, sim_ji], dim=0) nominator = torch.exp(positives / self.temperature) denominator = self.negatives_mask * torch.exp(similarity_matrix / self.temperature) loss_partial = -torch.log(nominator / torch.sum(denominator, dim=1)) loss = torch.sum(loss_partial) / (2 * self.batch_size) return loss def distributionClusters(adata, control_cl, group_cl, perc_cell_to_show=0.1, figsize = (12,8), dpi=100, save_path=None): df = adata.obs.groupby([group_cl, control_cl]).size().unstack() df = df.loc[df.sum(axis=1)/df.values.sum()>=perc_cell_to_show,:] # remove row if group_cluster represents less cells in perc than perc_cell_to_show. df_rel = df df = df.div(df.sum(axis=1), axis=0) df[group_cl] = df.index plt.rcParams["figure.figsize"] = figsize plt.rcParams['figure.dpi'] = dpi plt.rcParams['figure.facecolor'] = '#FFFFFF' df.plot( x = group_cl, kind = 'barh', stacked = True, mark_right = True, ) leg = plt.legend(bbox_to_anchor=(1, 1), loc="upper left") leg.get_frame().set_edgecolor('black') plt.xlabel('perc_'+control_cl, loc='center') for n in df_rel: for i, (pos_x, ab, value) in enumerate(zip(df.iloc[:, :-1].cumsum(1)[n], df[n], df_rel[n])): if (value == 0) | (ab <=0.05): value = '' plt.text(pos_x-ab/2, i, str(value), va='center', ha='center') plt.grid(False) if save_path is not None: plt.savefig(save_path, bbox_inches='tight') return(plt.show()) # ================================================================ # 🔧 OUTPUT HANDLER: salva UMAP e AnnData in /output/ # ================================================================ import os import matplotlib.pyplot as plt import scanpy as sc def save_all_umaps(adata, prefix="output", color_by=None, dpi=300): """ Salva tutte le UMAP presenti in adata.obsm come immagini PNG. Non salva l'AnnData, nessun h5ad viene scritto. """ import os import scanpy as sc import matplotlib.pyplot as plt os.makedirs(prefix, exist_ok=True) print(f"[INFO] Cartella di output: {prefix}") # === Trova tutte le UMAP disponibili === umap_keys = [k for k in adata.obsm.keys() if k.endswith("_umap") or k == "X_umap"] if not umap_keys: print("[WARN] Nessuna UMAP trovata in adata.obsm") return print(f"[INFO] UMAP trovate: {umap_keys}") # === Determina cosa usare come colore === if color_by is None: cluster_cols = [c for c in adata.obs.columns if "clust" in c.lower()] color_by = cluster_cols if cluster_cols else ["n_genes"] elif isinstance(color_by, str): color_by = [color_by] print(f"[INFO] Colorazioni da usare: {color_by}") # === Salva ogni combinazione UMAP × colore === for key in umap_keys: for color in color_by: fig_path = os.path.join(prefix, f"{key}_{color}.png") try: sc.pl.embedding( adata, basis=key, color=color, frameon=False, show=False, ) plt.savefig(fig_path, dpi=dpi, bbox_inches="tight") plt.close() print(f"[OK] Salvata {fig_path}") except Exception as e: print(f"[WARN] Errore nel salvare {fig_path}: {e}") print("[✅] Tutte le UMAP salvate con successo.")