# import packs import numpy as np import pandas as pd import scanpy as sc import gc import torch import torch.nn.functional as F import umap import anndata from sklearn.preprocessing import StandardScaler from sklearn.metrics.cluster import adjusted_rand_score from sklearn.metrics import normalized_mutual_info_score from sklearn.feature_extraction.text import TfidfTransformer from scipy import io from scipy import sparse import matplotlib.pyplot as plt ### FUNCTION ### ################ def transcriptomicAnalysis( path_10x, bcode_variants, high_var_genes_min_mean=0.015, high_var_genes_max_mean=3, high_var_genes_min_disp=0.75, min_genes=200, min_cells=3, max_pct_mt=100, n_neighbors=20, n_pcs=30, # se None → niente PCA transcript_key='trans', manual_matrix_reading=False ): """ Preprocess transcriptomic (RNA) data from 10x Genomics format. If n_pcs=None, skip PCA and use full (normalized) expression matrix as embedding. """ import scanpy as sc import anndata import pandas as pd import numpy as np import scipy.sparse as sp from scipy import io, sparse from sklearn.preprocessing import StandardScaler print("[INFO] === TRANSCRIPTOMIC ANALYSIS START ===") # === Lettura dati RNA === if manual_matrix_reading: rna = path_10x + '/matrix.mtx.gz' barcode = path_10x + '/barcodes.tsv.gz' feat = path_10x + '/features.tsv.gz' rna = io.mmread(rna) B = rna.todense().T barcode = pd.read_csv(barcode, sep='\t', header=None, names=['bcode']) barcode.index = barcode.iloc[:, 0] barcode = barcode.drop('bcode', axis=1) feat = pd.read_csv(feat, sep='\t', header=None, names=['gene_ids', 'gene_symbol', 'feature_types']) feat.index = feat['gene_symbol'] feat = feat.drop('gene_symbol', axis=1) adata = anndata.AnnData(X=B, obs=barcode, var=feat) adata.X = sparse.csr_matrix(adata.X) else: adata = sc.read_10x_mtx(path_10x, var_names='gene_symbols', cache=True) # === Filtra barcode presenti anche nel file varianti === bcode_var = pd.read_csv(bcode_variants, sep='\t', header=None)[0] adata = adata[adata.obs.index.isin(bcode_var)] # === Filtraggio e QC === sc.pp.filter_cells(adata, min_genes=min_genes) sc.pp.filter_genes(adata, min_cells=min_cells) adata.var['mt'] = adata.var_names.str.startswith('MT-') sc.pp.calculate_qc_metrics(adata, qc_vars=['mt'], percent_top=None, log1p=False, inplace=True) adata = adata[adata.obs.pct_counts_mt < max_pct_mt, :] # === Normalizzazione e log === sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) # ~_~T FIX: rimuove eventuali NaN generati da log1p if sp.issparse(adata.X): adata.X.data[np.isnan(adata.X.data)] = 0 else: adata.X = np.nan_to_num(adata.X, nan=0.0, posinf=0.0, neginf=0.0) # === Selezione geni altamente variabili === sc.pp.highly_variable_genes( adata, min_mean=high_var_genes_min_mean, max_mean=high_var_genes_max_mean, min_disp=high_var_genes_min_disp ) # === Salva matrice originale === adata.layers["trans_raw"] = adata.X.copy() # === PCA opzionale === if n_pcs is not None: print(f"[INFO] PCA con {n_pcs} componenti...") sc.pp.pca(adata, n_comps=n_pcs, use_highly_variable=True) sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=n_pcs, random_state=42) sc.tl.umap(adata, random_state=42) adata.obsm[transcript_key + '_umap'] = adata.obsm['X_umap'] embedding = adata.obsm['X_pca'] else: print("[INFO] PCA disabilitata → uso matrice intera (tutte le feature normalizzate).") embedding = adata.X.toarray() if sp.issparse(adata.X) else adata.X # === Scaling per uniformare range numerico === scaler = StandardScaler(with_mean=False) # evita densificazione su sparse adata.uns[transcript_key + '_X'] = scaler.fit_transform(embedding) print(f"[INFO] Salvato embedding → adata.uns['{transcript_key}_X'] shape={adata.uns[transcript_key + '_X'].shape}") print("[INFO] === TRANSCRIPTOMIC ANALYSIS DONE ===") return adata def _prep_embedding_for_training(X): """ Z-score per feature, poi L2 per cella. Restituisce float32 numpy. """ import numpy as np from sklearn.preprocessing import StandardScaler, normalize X = np.asarray(X, dtype=np.float32) X = StandardScaler(with_mean=True, with_std=True).fit_transform(X) X = normalize(X, norm="l2", axis=1) return X import torch import torch.nn as nn import torch.nn.functional as F class PairedAE(nn.Module): """ Autoencoder doppio (RNA↔VAR) con encoder e decoder separati. Supporta forward(xa, xb) -> (za, zb, ra, rb) e decodifica indipendente: decode_a(zb), decode_b(za). """ def __init__(self, input_dim_a, input_dim_b, latent_dim=32, hidden=256): super(PairedAE, self).__init__() # Encoder RNA self.enc_a = nn.Sequential( nn.Linear(input_dim_a, hidden), nn.ReLU(), nn.Linear(hidden, latent_dim) ) # Encoder VAR self.enc_b = nn.Sequential( nn.Linear(input_dim_b, hidden), nn.ReLU(), nn.Linear(hidden, latent_dim) ) # Decoder RNA self.dec_a = nn.Sequential( nn.Linear(latent_dim, hidden), nn.ReLU(), nn.Linear(hidden, input_dim_a) ) # Decoder VAR self.dec_b = nn.Sequential( nn.Linear(latent_dim, hidden), nn.ReLU(), nn.Linear(hidden, input_dim_b) ) def forward(self, xa, xb): za = self.enc_a(xa) zb = self.enc_b(xb) ra = self.dec_a(za) rb = self.dec_b(zb) return za, zb, ra, rb # 🔹 Queste due servono per la cross-reconstruction def decode_a(self, z): """Decodifica un vettore latente nel dominio RNA""" return self.dec_a(z) def decode_b(self, z): """Decodifica un vettore latente nel dominio VAR""" return self.dec_b(z) def train_paired_ae( Xa, Xb, latent_dim=32, hidden=256, batch_size=256, num_epoch=300, lr=1e-3, lam_align=0.5, # forza dell’allineamento coseno lam_recon_a=1.0, # peso ricostruzione RNA lam_recon_b=1.0, # peso ricostruzione VAR lam_cross=2.0, # peso cross-reconstruction lam_corr=1.0, # 🔥 nuovo: forza correlazione latente use_bernoulli_b=False, verbose=True, ): """ Paired AutoEncoder con: - ricostruzione RNA/VAR - allineamento coseno - cross-reconstruction (A→B, B→A) - correlazione latente (tipo DeepCCA) """ import torch import torch.nn.functional as F import numpy as np device = torch.device("cpu") Xa = np.asarray(Xa, dtype=np.float32) Xb = np.asarray(Xb, dtype=np.float32) n, da = Xa.shape _, db = Xb.shape # === Define model === class PairedAE(torch.nn.Module): def __init__(self, da, db, hidden=256, latent_dim=32): super().__init__() self.enc_a = torch.nn.Sequential( torch.nn.Linear(da, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim), ) self.enc_b = torch.nn.Sequential( torch.nn.Linear(db, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim), ) self.dec_a = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, da), ) self.dec_b = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, db), ) def forward(self, xa, xb): za = self.enc_a(xa) zb = self.enc_b(xb) ra = self.dec_a(za) rb = self.dec_b(zb) return za, zb, ra, rb model = PairedAE(da, db, hidden, latent_dim).to(device) opt = torch.optim.Adam(model.parameters(), lr=lr) Xa_t = torch.tensor(Xa, dtype=torch.float32).to(device) Xb_t = torch.tensor(Xb, dtype=torch.float32).to(device) cos = torch.nn.CosineEmbeddingLoss(margin=0.0, reduction="mean") def corr_loss(z1, z2): """correlation-based alignment (DeepCCA-style)""" z1 = z1 - z1.mean(0) z2 = z2 - z2.mean(0) cov = (z1 * z2).mean(0) std1 = z1.std(0) + 1e-8 std2 = z2.std(0) + 1e-8 corr = cov / (std1 * std2) return -corr.mean() # want to maximize corr for epoch in range(1, num_epoch + 1): perm = torch.randperm(n) total_loss = 0.0 for i in range(0, n, batch_size): idx = perm[i:i + batch_size] xa = Xa_t[idx] xb = Xb_t[idx] za, zb, ra, rb = model(xa, xb) # reconstruction losses recon_a = torch.mean((ra - xa) ** 2) if use_bernoulli_b: recon_b = F.binary_cross_entropy(torch.sigmoid(rb), xb.clamp(0, 1)) else: recon_b = torch.mean((rb - xb) ** 2) # cross reconstruction rb_from_a = model.dec_b(za) ra_from_b = model.dec_a(zb) cross_ab = torch.mean((rb_from_a - xb) ** 2) cross_ba = torch.mean((ra_from_b - xa) ** 2) # cosine alignment y = torch.ones(za.size(0), device=device) align = cos(za, zb, y) # correlation loss corr = corr_loss(za, zb) # total loss loss = ( lam_recon_a * recon_a + lam_recon_b * recon_b + lam_cross * (cross_ab + cross_ba) + lam_align * align + lam_corr * corr ) opt.zero_grad() loss.backward() opt.step() total_loss += loss.item() if verbose and (epoch % max(1, num_epoch // 10) == 0 or epoch == 1): print(f"[AE] Epoch {epoch:04d}/{num_epoch} " f"| loss={total_loss:.4f} " f"| rec_a={recon_a.item():.4f} rec_b={recon_b.item():.4f} " f"| align={align.item():.4f} corr={-corr.item():.4f}") # final embeddings with torch.no_grad(): zA, zB, _, _ = model(Xa_t, Xb_t) zA = F.normalize(zA, dim=1).cpu().numpy() zB = F.normalize(zB, dim=1).cpu().numpy() return model, zA, zB def variantAnalysis( adata, matrix_path=None, bcode_path=None, variants_path=None, min_cells=10, min_counts=20, variant_filter_level="Norm", n_pcs=50, # PCA facoltativa per ridurre rumore variant_rep="tfidf", # 'tfidf' | 'binary' | 'lognorm' ): """ Prepara la vista VAR a partire da adata.X (counts/binari). Applica filtro opzionale, rappresentazione scelta, scaling e PCA (facoltativa). Salva l'embedding in adata.uns['variant_X'] e genera variant_umap per QC. """ import scanpy as sc import numpy as np from scipy import sparse from sklearn.preprocessing import StandardScaler from sklearn.feature_extraction.text import TfidfTransformer print("[INFO] === VARIANT ANALYSIS START ===") print(f"[INFO] Filtraggio: {variant_filter_level} | Rappresentazione: {variant_rep} | PCA: {n_pcs}") X = adata.X.copy() n_total = X.shape[1] # --- soglie filtro --- if variant_filter_level.lower() == "none": min_cells_eff, min_counts_eff = 0, 0 elif variant_filter_level.lower() == "low": min_cells_eff = max(1, int(min_cells * 0.3)) min_counts_eff = max(1, int(min_counts * 0.3)) else: # "Norm" min_cells_eff, min_counts_eff = min_cells, min_counts # --- filtro per varianti rare/deboli --- if sparse.issparse(X): counts_per_var = np.asarray(X.sum(axis=0)).ravel() cells_per_var = np.asarray((X > 0).sum(axis=0)).ravel() else: counts_per_var = X.sum(axis=0) cells_per_var = (X > 0).sum(axis=0) mask = (cells_per_var >= min_cells_eff) & (counts_per_var >= min_counts_eff) removed = int((~mask).sum()) if removed > 0: adata = adata[:, mask].copy() X = adata.X print(f"[INFO] Varianti filtrate: {removed}/{n_total} → kept {adata.n_vars}") # --- rappresentazione varianti --- if variant_rep == "binary": if sparse.issparse(X): X_rep = (X > 0).astype("float32") else: X_rep = (X > 0).astype("float32") elif variant_rep == "lognorm": # counts → log1p(normalized) if sparse.issparse(X): lib = np.asarray(X.sum(axis=1)).ravel() lib[lib == 0] = 1.0 X_norm = X.multiply(1.0 / lib[:, None]) X_rep = X_norm else: lib = X.sum(axis=1, keepdims=True) lib[lib == 0] = 1.0 X_rep = X / lib # log1p if sparse.issparse(X_rep): X_rep = X_rep.tocsr(copy=True) X_rep.data = np.log1p(X_rep.data) else: X_rep = np.log1p(X_rep) else: # "tfidf" (default, consigliato per binario/sparse) tfidf = TfidfTransformer(norm=None, use_idf=True, smooth_idf=True, sublinear_tf=False) if sparse.issparse(X): X_rep = tfidf.fit_transform(X) else: X_rep = tfidf.fit_transform(sparse.csr_matrix(X)) # normalizza per cella (L2) per comparabilità from sklearn.preprocessing import normalize X_rep = normalize(X_rep, norm="l2", axis=1) # --- scaling & PCA (facoltativa) --- if n_pcs is not None: sc.pp.scale(adata, zero_center=False) # solo per settare .var e compatibilità # PCA con arpack non accetta direttamente sparse: convertiamo from sklearn.decomposition import PCA, TruncatedSVD if sparse.issparse(X_rep): # SVD tronca più stabile sullo sparse svd = TruncatedSVD(n_components=n_pcs, random_state=42) embedding = svd.fit_transform(X_rep) else: pca = PCA(n_components=n_pcs, random_state=42) embedding = pca.fit_transform(X_rep) else: embedding = X_rep.toarray() if sparse.issparse(X_rep) else X_rep # --- salva embedding per integrazione + QC UMAP --- from sklearn.preprocessing import StandardScaler adata.uns["variant_X"] = StandardScaler(with_mean=True, with_std=True).fit_transform( np.asarray(embedding, dtype=np.float32) ) # QC UMAP (2D) solo per ispezione sc.pp.neighbors(adata, use_rep=None, n_neighbors=15, n_pcs=None) # non usato per variant_X import umap emb2d = umap.UMAP(n_neighbors=10, min_dist=0.05, random_state=42).fit_transform(adata.uns["variant_X"]) adata.obsm["variant_umap"] = emb2d print(f"[INFO] Salvato adata.uns['variant_X'] shape={adata.uns['variant_X'].shape}") print("[INFO] === VARIANT ANALYSIS DONE ===") return adata def calcOmicsClusters(adata, omic_key, res=0.5, n_neighbors=None, n_pcs=None): """ Compute clustering (Leiden) on a given omic embedding (trans, variant, int). Automatically adapts to 2D UMAPs or high-dimensional embeddings. """ import scanpy as sc import numpy as np # Default n_neighbors if n_neighbors is None: n_neighbors = 15 # --- Detect representation type --- use_rep = omic_key + "_umap" if use_rep in adata.obsm: # UMAP is 2D -> skip n_pcs print(f"[INFO] Using 2D UMAP embedding: {use_rep}") sc.pp.neighbors(adata, n_neighbors=n_neighbors, use_rep=use_rep, key_added=f"{omic_key}_neighbors") elif omic_key + "_X" in adata.uns: # High-dimensional representation saved in uns X = adata.uns[omic_key + "_X"] adata.obsm[omic_key + "_X"] = X dims = X.shape[1] print(f"[INFO] Using high-dimensional embedding: {omic_key}_X ({dims} dims)") sc.pp.neighbors( adata, n_neighbors=n_neighbors, n_pcs=min(n_pcs or dims, dims), use_rep=omic_key + "_X", key_added=f"{omic_key}_neighbors" ) else: raise ValueError(f"No valid representation found for omic_key='{omic_key}'") # --- Leiden clustering --- sc.tl.leiden( adata, resolution=res, key_added=f"{omic_key}_clust_{res}", neighbors_key=f"{omic_key}_neighbors" ) return adata def weightsInit(m): if isinstance(m, torch.nn.Linear): torch.nn.init.xavier_uniform(m.weight.data) m.bias.data.zero_() def omicsIntegration( adata, transcript_key='trans', variant_key='variant', integration_key='int', latent_dim=32, num_epoch=300, lam_align=1.0, lam_recon_a=1.0, lam_recon_b=1.0, lam_cross=2.0, seed=42, ): """ Adaptive omics integration with automatic regularization: - Small datasets → weighted PCA fusion (ConcatPCA-like) + AE refinement - Large datasets → full asymmetric AE integration - No CCA, stable scaling between modalities """ import numpy as np import umap, scanpy as sc from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler, normalize print("[INFO] === OMICS INTEGRATION (Regularization mode) START ===") Xa = np.asarray(adata.uns[transcript_key + "_X"], dtype=np.float32) Xb = np.asarray(adata.uns[variant_key + "_X"], dtype=np.float32) n_cells, n_featA = Xa.shape _, n_featB = Xb.shape assert Xa.shape[0] == Xb.shape[0], "RNA/VAR rows (cells) must match" # --- Global normalization --- Xa = StandardScaler(with_mean=True, with_std=True).fit_transform(Xa) Xb = StandardScaler(with_mean=True, with_std=True).fit_transform(Xb) # --- Adaptive selection --- if n_cells < 5000 or n_featB < 2000: print("[INFO] Regularization mode → adaptive integration") # Weighted PCA fusion (ConcatPCA-style) scale_b = np.std(Xa) / (np.std(Xb) + 1e-9) Xb *= scale_b * 1.2 X_concat = np.concatenate([Xa, Xb], axis=1) pca = PCA(n_components=min(latent_dim * 2, X_concat.shape[1]), random_state=seed) z_init = pca.fit_transform(X_concat) z_init = StandardScaler().fit_transform(z_init) z_init = normalize(z_init, norm="l2", axis=1) # Light AE refinement try: _, zA, zB = train_paired_ae( Xa, Xb, latent_dim=latent_dim, num_epoch=max(30, num_epoch // 3), lam_align=lam_align * 0.5, lam_recon_a=lam_recon_a, lam_recon_b=lam_recon_b, lam_cross=lam_cross * 0.5, verbose=False, ) z = 0.5 * (z_init + 0.5 * (zA + zB)) except Exception as e: print(f"[WARN] AE refinement skipped ({e})") z = z_init else: print("[INFO] Full AE mode → asymmetric autoencoder integration") _, zA, zB = train_paired_ae( Xa, Xb, latent_dim=latent_dim, num_epoch=num_epoch, lam_align=lam_align, lam_recon_a=lam_recon_a, lam_recon_b=lam_recon_b, lam_cross=lam_cross, verbose=True, ) z = 0.5 * (zA + zB) # --- Latent smoothing and normalization --- z = z + 0.05 * np.random.randn(*z.shape).astype(np.float32) z = StandardScaler().fit_transform(z) z = normalize(z, norm="l2", axis=1) # --- UMAP computation --- um = umap.UMAP(n_neighbors=15, min_dist=0.05, random_state=seed).fit_transform(z) adata.uns[integration_key + "_X"] = z.astype(np.float32) adata.obsm[integration_key + "_umap"] = um adata.uns[integration_key + "_metrics"] = { "latent_dim": int(latent_dim), "num_epoch": int(num_epoch), "lam_align": float(lam_align), "lam_cross": float(lam_cross), "mode": "Regularization" if n_cells < 5000 else "AE", } print("[INFO] === OMICS INTEGRATION DONE ===") return adata class pairedIntegration(torch.nn.Module): def __init__(self,input_dim_a=2000,input_dim_b=2000,clf_out=10): super(pairedIntegration, self).__init__() self.input_dim_a = input_dim_a self.input_dim_b = input_dim_b self.clf_out = clf_out self.encoder_a = torch.nn.Sequential( torch.nn.Linear(self.input_dim_a, 1000), torch.nn.BatchNorm1d(1000), torch.nn.ReLU(), torch.nn.Linear(1000, 512), torch.nn.BatchNorm1d(512), torch.nn.ReLU(), torch.nn.Linear(512, 128), torch.nn.BatchNorm1d(128), torch.nn.ReLU()) self.encoder_b = torch.nn.Sequential( torch.nn.Linear(self.input_dim_b, 1000), torch.nn.BatchNorm1d(1000), torch.nn.ReLU(), torch.nn.Linear(1000, 512), torch.nn.BatchNorm1d(512), torch.nn.ReLU(), torch.nn.Linear(512, 128), torch.nn.BatchNorm1d(128), torch.nn.ReLU()) self.clf = torch.nn.Sequential( torch.nn.Linear(128, self.clf_out), torch.nn.Softmax(dim=1)) self.feature = torch.nn.Sequential( torch.nn.Linear(128, 32)) def forward(self, x_a,x_b): out_a = self.encoder_a(x_a) f_a = self.feature(out_a) y_a = self.clf(out_a) out_b = self.encoder_b(x_b) f_b = self.feature(out_b) y_b = self.clf(out_b) return f_a,y_a,f_b,y_b def pairedIntegrationTrainer(X_a, X_b, model, batch_size = 512, num_epoch=5, f_temp = 0.1, p_temp = 1.0): device = torch.device("cpu") f_con = contrastiveLoss(batch_size = batch_size,temperature = f_temp) p_con = contrastiveLoss(batch_size = model.clf_out,temperature = p_temp) opt = torch.optim.SGD(model.parameters(),lr=0.01, momentum=0.9,weight_decay=5e-4) for k in range(num_epoch): model.to(device) n = X_a.shape[0] r = np.random.permutation(n) X_train_a = X_a[r,:] X_tensor_A=torch.tensor(X_train_a).float() X_train_b = X_b[r,:] X_tensor_B=torch.tensor(X_train_b).float() losses = 0 for j in range(n//batch_size): inputs_a = X_tensor_A[j*batch_size:(j+1)*batch_size,:].to(device) inputs_a2 = inputs_a + torch.normal(0,1,inputs_a.shape).to(device) inputs_a = inputs_a + torch.normal(0,1,inputs_a.shape).to(device) inputs_b = X_tensor_B[j*batch_size:(j+1)*batch_size,:].to(device) inputs_b = inputs_b + torch.normal(0,1,inputs_b.shape).to(device) feas,o,nfeas,no = model(inputs_a,inputs_b) feas2,o2,_,_ = model(inputs_a2,inputs_b) fea_mi = f_con(feas,nfeas)+f_con(feas,feas2) p_mi = p_con(o.T,no.T)+p_con(o.T,o2.T) loss = fea_mi + p_mi opt.zero_grad() loss.backward() opt.step() losses += loss.data.tolist() print("Total loss: "+str(round(losses,4))) gc.collect() class contrastiveLoss(torch.nn.Module): def __init__(self, batch_size, temperature=0.5): super().__init__() self.batch_size = batch_size self.register_buffer("temperature", torch.tensor(temperature)) self.register_buffer("negatives_mask", (~torch.eye(batch_size * 2, batch_size * 2, dtype=bool)).float()) def forward(self, emb_i, emb_j): # """ # emb_i and emb_j are batches of embeddings, where corresponding indices are pairs # z_i, z_j as per SimCLR paper # """ z_i = F.normalize(emb_i, dim=1,p=2) z_j = F.normalize(emb_j, dim=1,p=2) representations = torch.cat([z_i, z_j], dim=0) similarity_matrix = F.cosine_similarity(representations.unsqueeze(1), representations.unsqueeze(0), dim=2) sim_ij = torch.diag(similarity_matrix, self.batch_size) sim_ji = torch.diag(similarity_matrix, -self.batch_size) positives = torch.cat([sim_ij, sim_ji], dim=0) nominator = torch.exp(positives / self.temperature) denominator = self.negatives_mask * torch.exp(similarity_matrix / self.temperature) loss_partial = -torch.log(nominator / torch.sum(denominator, dim=1)) loss = torch.sum(loss_partial) / (2 * self.batch_size) return loss def distributionClusters(adata, control_cl, group_cl, perc_cell_to_show=0.1, figsize = (12,8), dpi=100, save_path=None): df = adata.obs.groupby([group_cl, control_cl]).size().unstack() df = df.loc[df.sum(axis=1)/df.values.sum()>=perc_cell_to_show,:] # remove row if group_cluster represents less cells in perc than perc_cell_to_show. df_rel = df df = df.div(df.sum(axis=1), axis=0) df[group_cl] = df.index plt.rcParams["figure.figsize"] = figsize plt.rcParams['figure.dpi'] = dpi plt.rcParams['figure.facecolor'] = '#FFFFFF' df.plot( x = group_cl, kind = 'barh', stacked = True, mark_right = True, ) leg = plt.legend(bbox_to_anchor=(1, 1), loc="upper left") leg.get_frame().set_edgecolor('black') plt.xlabel('perc_'+control_cl, loc='center') for n in df_rel: for i, (pos_x, ab, value) in enumerate(zip(df.iloc[:, :-1].cumsum(1)[n], df[n], df_rel[n])): if (value == 0) | (ab <=0.05): value = '' plt.text(pos_x-ab/2, i, str(value), va='center', ha='center') plt.grid(False) if save_path is not None: plt.savefig(save_path, bbox_inches='tight') return(plt.show()) # ================================================================ # 🔧 OUTPUT HANDLER: salva UMAP e AnnData in /output/ # ================================================================ import os import matplotlib.pyplot as plt import scanpy as sc def save_all_umaps(adata, prefix="output", color_by=None, dpi=300): """ Salva tutte le UMAP presenti in adata.obsm come immagini PNG. Non salva l'AnnData, nessun h5ad viene scritto. """ import os import scanpy as sc import matplotlib.pyplot as plt os.makedirs(prefix, exist_ok=True) print(f"[INFO] Cartella di output: {prefix}") # === Trova tutte le UMAP disponibili === umap_keys = [k for k in adata.obsm.keys() if k.endswith("_umap") or k == "X_umap"] if not umap_keys: print("[WARN] Nessuna UMAP trovata in adata.obsm") return print(f"[INFO] UMAP trovate: {umap_keys}") # === Determina cosa usare come colore === if color_by is None: cluster_cols = [c for c in adata.obs.columns if "clust" in c.lower()] color_by = cluster_cols if cluster_cols else ["n_genes"] elif isinstance(color_by, str): color_by = [color_by] print(f"[INFO] Colorazioni da usare: {color_by}") # === Salva ogni combinazione UMAP × colore === for key in umap_keys: for color in color_by: fig_path = os.path.join(prefix, f"{key}_{color}.png") try: sc.pl.embedding( adata, basis=key, color=color, frameon=False, show=False, ) plt.savefig(fig_path, dpi=dpi, bbox_inches="tight") plt.close() print(f"[OK] Salvata {fig_path}") except Exception as e: print(f"[WARN] Errore nel salvare {fig_path}: {e}") print("[✅] Tutte le UMAP salvate con successo.")