# import packs import numpy as np import pandas as pd import scanpy as sc import gc import torch import torch.nn.functional as F import umap import anndata from sklearn.preprocessing import StandardScaler from sklearn.metrics.cluster import adjusted_rand_score from sklearn.metrics import normalized_mutual_info_score from sklearn.feature_extraction.text import TfidfTransformer from scipy import io from scipy import sparse import matplotlib.pyplot as plt ### FUNCTION ### ################ def transcriptomicAnalysis( path_10x, bcode_variants, high_var_genes_min_mean=0.015, high_var_genes_max_mean=3, high_var_genes_min_disp=0.75, min_genes=200, min_cells=3, max_pct_mt=100, n_neighbors=20, n_pcs=30, transcript_key='trans', manual_matrix_reading=False, force_float64=True, ): """ Preprocess transcriptomic (RNA) data from 10x Genomics format. Ora la normalizzazione e lo scaling replicano esattamente la pipeline Muon: normalize_total → log1p → scale(max_value=10) """ import scanpy as sc import anndata import pandas as pd import numpy as np import scipy.sparse as sp from scipy import io from sklearn.preprocessing import StandardScaler np.set_printoptions(precision=6, suppress=False, linewidth=140) print("[INFO] === TRANSCRIPTOMIC ANALYSIS START ===") # === Lettura dati RNA === if manual_matrix_reading: rna = path_10x + '/matrix.mtx.gz' barcode = path_10x + '/barcodes.tsv.gz' feat = path_10x + '/features.tsv.gz' M = io.mmread(rna).astype(np.float64 if force_float64 else np.float32) B = M.T # (cells x genes) barcode = pd.read_csv(barcode, sep='\t', header=None, names=['bcode']) barcode.index = barcode['bcode'] feat = pd.read_csv(feat, sep='\t', header=None, names=['gene_ids', 'gene_symbol', 'feature_types']) feat.index = feat['gene_symbol'] adata = anndata.AnnData(X=B, obs=barcode, var=feat) adata.X = sp.csr_matrix(adata.X) else: adata = sc.read_10x_mtx(path_10x, var_names='gene_symbols', cache=True) adata.X = adata.X.astype(np.float64 if force_float64 else np.float32) # === Salva copia grezza === if not sp.issparse(adata.X): adata.X = sp.csr_matrix(adata.X) adata.uns[f"{transcript_key}_raw"] = adata.X.copy() adata.uns[f"{transcript_key}_raw_obs_names"] = np.array(adata.obs_names) adata.uns[f"{transcript_key}_raw_var_names"] = np.array(adata.var_names) print(f"[DEBUG] Raw RNA matrix shape: {adata.X.shape}") raw_block = adata.X[:5, :5].toarray().astype(float) print(f"[DEBUG] Prime 5×5 celle RAW (counts):\n{np.array2string(raw_block, precision=5, floatmode='maxprec_equal')}") # === Filtra barcode presenti anche nel file varianti === bcode_var = pd.read_csv(bcode_variants, sep='\t', header=None)[0] adata = adata[adata.obs.index.isin(bcode_var)].copy() # === Filtraggio e QC === sc.pp.filter_cells(adata, min_genes=min_genes) sc.pp.filter_genes(adata, min_cells=min_cells) adata.var['mt'] = adata.var_names.str.startswith('MT-') sc.pp.calculate_qc_metrics(adata, qc_vars=['mt'], percent_top=None, log1p=False, inplace=True) adata = adata[adata.obs.pct_counts_mt < max_pct_mt, :].copy() # === Normalizzazione Muon-style === sc.pp.normalize_total(adata) sc.pp.log1p(adata) sc.pp.scale(adata, max_value=10) # === Rimozione eventuali NaN/inf === if sp.issparse(adata.X): adata.X.data[np.isnan(adata.X.data)] = 0 adata.X.data[np.isinf(adata.X.data)] = 0 else: adata.X = np.nan_to_num(adata.X, nan=0.0, posinf=0.0, neginf=0.0) print(f"[DEBUG] RNA normalizzata e scalata (Muon-style). Shape: {adata.X.shape}") norm_block = adata.X[:5, :5].toarray().astype(float) if sp.issparse(adata.X) else adata.X[:5, :5].astype(float) print(f"[DEBUG] Prime 5×5 celle RNA (normalizzate):\n{np.array2string(norm_block, precision=4, floatmode='maxprec_equal')}") # === PCA opzionale === if n_pcs is not None: print(f"[INFO] PCA con {n_pcs} componenti...") sc.pp.pca(adata, n_comps=n_pcs) sc.pp.neighbors(adata, n_neighbors=n_neighbors, n_pcs=n_pcs, random_state=42) sc.tl.umap(adata, random_state=42) adata.obsm[f"{transcript_key}_umap"] = adata.obsm["X_umap"] embedding = adata.obsm["X_pca"] else: print("[INFO] PCA disabilitata → uso matrice normalizzata intera.") embedding = adata.X.toarray() if sp.issparse(adata.X) else adata.X # === Scaling per range uniforme (senza centrare) === scaler = StandardScaler(with_mean=False) adata.uns[f"{transcript_key}_X"] = scaler.fit_transform(embedding) print(f"[INFO] Salvato adata.uns['{transcript_key}_X'] shape={adata.uns[f'{transcript_key}_X'].shape}") print(f"[INFO] === TRANSCRIPTOMIC ANALYSIS DONE ({adata.n_obs} cells, {adata.n_vars} genes) ===") return adata def _prep_embedding_for_training(X): """ Z-score per feature, poi L2 per cella. Restituisce float32 numpy. """ import numpy as np from sklearn.preprocessing import StandardScaler, normalize X = np.asarray(X, dtype=np.float32) X = StandardScaler(with_mean=True, with_std=True).fit_transform(X) X = normalize(X, norm="l2", axis=1) return X class PairedAE(torch.nn.Module): """ Due encoder (RNA/VAR) e due decoder con bottleneck condiviso (latent_dim). Loss: ricostruzione + allineamento (cosine) tra z_rna e z_var. """ def __init__(self, dim_a, dim_b, latent_dim=32, hidden=256): super().__init__() self.encoder_a = torch.nn.Sequential( torch.nn.Linear(dim_a, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim), ) self.encoder_b = torch.nn.Sequential( torch.nn.Linear(dim_b, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim), ) self.decoder_a = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, dim_a), ) self.decoder_b = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, dim_b), ) def forward(self, xa, xb): za = self.encoder_a(xa) zb = self.encoder_b(xb) ra = self.decoder_a(za) rb = self.decoder_b(zb) return za, zb, ra, rb def train_paired_ae( Xa, Xb, latent_dim=32, hidden=256, batch_size=256, num_epoch=300, lr=1e-3, lam_align=0.5, # forza allineamento lam_recon_a=1.0, # ricostruzione RNA lam_recon_b=3.0, # ricostruzione VAR lam_cross=2.0, # nuova: RNA→VAR consistency verbose=True, ): """ Paired Autoencoder asimmetrico: RNA = teacher, VAR = student. Loss = recA + recB + cross(A→B) + cosine alignment. """ import torch, torch.nn.functional as F, numpy as np Xa = _prep_embedding_for_training(Xa) Xb = _prep_embedding_for_training(Xb) Xa, Xb = np.asarray(Xa, np.float32), np.asarray(Xb, np.float32) n, da = Xa.shape _, db = Xb.shape class AsymAE(torch.nn.Module): def __init__(self, da, db, latent_dim, hidden): super().__init__() self.enc_a = torch.nn.Sequential( torch.nn.Linear(da, hidden), torch.nn.ReLU(), torch.nn.Dropout(0.1), torch.nn.Linear(hidden, latent_dim) ) self.enc_b = torch.nn.Sequential( torch.nn.Linear(db, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, latent_dim) ) self.dec_a = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, da) ) self.dec_b = torch.nn.Sequential( torch.nn.Linear(latent_dim, hidden), torch.nn.ReLU(), torch.nn.Linear(hidden, db) ) def forward(self, xa, xb): za = self.enc_a(xa) zb = self.enc_b(xb) ra = self.dec_a(za) rb = self.dec_b(zb) return za, zb, ra, rb device = torch.device("cpu") model = AsymAE(da, db, latent_dim, hidden).to(device) opt = torch.optim.Adam(model.parameters(), lr=lr) cos = torch.nn.CosineEmbeddingLoss(margin=0.0) Xa_t, Xb_t = torch.tensor(Xa), torch.tensor(Xb) for ep in range(1, num_epoch + 1): perm = torch.randperm(n) total = 0 for i in range(0, n, batch_size): idx = perm[i:i+batch_size] xa, xb = Xa_t[idx].to(device), Xb_t[idx].to(device) za, zb, ra, rb = model(xa, xb) rec_a = F.mse_loss(ra, xa) rec_b = F.mse_loss(rb, xb) cross = F.mse_loss(zb, za.detach()) # VAR deve seguire RNA y = torch.ones(za.shape[0], device=device) align = cos(za, zb, y) loss = lam_recon_a*rec_a + lam_recon_b*rec_b + lam_cross*cross + lam_align*align opt.zero_grad() loss.backward() opt.step() total += loss.item() if verbose and (ep % max(1, num_epoch//10) == 0 or ep==1): print(f"[AE] Epoch {ep:04d}/{num_epoch} | loss={total:.3f} | recA={rec_a:.3f} recB={rec_b:.3f} cross={cross:.3f}") with torch.no_grad(): zA, zB, _, _ = model(Xa_t, Xb_t) zA = F.normalize(zA, dim=1).cpu().numpy() zB = F.normalize(zB, dim=1).cpu().numpy() return model, zA, zB def variantAnalysis( adata, matrix_path=None, bcode_path=None, variants_path=None, min_cells=5, max_cell_fraction=0.95, variant_filter_level="norm", n_pcs=50, variant_rep="muon", # default comportamentale come Muon variant_key="variant", ): """ Preprocess variant (DNA) data. Supporta più rappresentazioni: - muon/binary: binaria + scalata (default) - tfidf: TF-IDF + L2 norm - lognorm: log1p normalizzato In tutti i casi salva: - adata.uns['variant_raw'] matrice grezza - adata.uns['variant_X'] embedding numerico (PCA/scaled) - adata.obsm['variant_umap'] embedding 2D per QC """ import scanpy as sc import numpy as np import pandas as pd from scipy import sparse, io from sklearn.preprocessing import StandardScaler, normalize from sklearn.decomposition import PCA, TruncatedSVD from sklearn.feature_extraction.text import TfidfTransformer import umap np.set_printoptions(precision=6, suppress=False, linewidth=140) print("[INFO] === VARIANT ANALYSIS START ===") print(f"[INFO] Lettura file: {matrix_path}") # --- Lettura matrice --- var_mtx = io.mmread(matrix_path) X = sparse.csr_matrix(var_mtx) barcodes = pd.read_csv(bcode_path, sep="\t", header=None)[0].astype(str).values variants = pd.read_csv(variants_path, sep="\t", header=None)[0].astype(str).values # --- Fix orientamento --- if X.shape[0] == len(variants) and X.shape[1] == len(barcodes): print(f"[WARN] Transposing variant matrix {X.shape} → expected (cells × variants)") X = X.T elif X.shape[1] != len(variants): print(f"[WARN] Dimensioni inconsuete {X.shape}, provo a trasporre per sicurezza") X = X.T print(f"[INFO] Matrice varianti caricata → {X.shape[0]} celle × {X.shape[1]} varianti") # === Debug blocco RAW === raw_block = X[:5, :5].toarray() print(f"[DEBUG] Prime 5×5 celle RAW (counts):\n{np.round(raw_block, 2)}") # --- Allineamento con RNA --- rna_barcodes = np.array([b.split('-')[0] for b in adata.obs_names]) var_barcodes = np.array([b.split('-')[0] for b in barcodes]) common = np.intersect1d(var_barcodes, rna_barcodes) if len(common) == 0: raise ValueError("[ERROR] Nessun barcode comune tra RNA e VAR.") order_idx = np.array([np.where(var_barcodes == b)[0][0] for b in rna_barcodes if b in var_barcodes]) X = X[order_idx, :] barcodes = barcodes[order_idx] print(f"[INFO] Celle comuni con RNA: {len(barcodes)}") # --- Salva GREZZI --- adata.uns[f"{variant_key}_raw"] = X.copy() adata.uns[f"{variant_key}_raw_obs_names"] = barcodes adata.uns[f"{variant_key}_raw_var_names"] = variants # === Selezione modalità di rappresentazione === rep = variant_rep.lower().strip() print(f"[INFO] Rappresentazione scelta: {rep.upper()}") # =============================== # MUON / BINARY (default) # =============================== if rep in ["muon", "binary"]: print("[INFO] Uso pipeline Muon-style (binary + scale)") import scanpy as sc X_bin = (X > 0).astype(float) dna = sc.AnnData(X_bin) dna.obs_names = barcodes dna.var_names = variants # Filtro varianti rare e troppo comuni sc.pp.filter_genes(dna, min_cells=min_cells) freq = np.array(dna.X.sum(axis=0)).flatten() / dna.n_obs keep = freq < max_cell_fraction dna = dna[:, keep].copy() print(f"[INFO] Varianti mantenute: {dna.n_vars}/{len(variants)}") # Scala sc.pp.scale(dna, max_value=10) norm_block = dna.X[:5, :5].toarray() if sparse.issparse(dna.X) else dna.X[:5, :5] print(f"[DEBUG] Prime 5×5 celle DNA (normalizzate):\n{np.array2string(norm_block, precision=4, floatmode='maxprec_equal')}") sc.tl.pca(dna, n_comps=n_pcs) sc.pp.neighbors(dna, n_pcs=min(30, n_pcs)) sc.tl.umap(dna, random_state=42) adata.uns[f"{variant_key}_X"] = dna.obsm["X_pca"] adata.obsm[f"{variant_key}_umap"] = dna.obsm["X_umap"] # =============================== # TF-IDF # =============================== elif rep == "tfidf": print("[INFO] Uso rappresentazione TF-IDF + L2 norm") tfidf = TfidfTransformer(norm=None, use_idf=True, smooth_idf=True) X_rep = tfidf.fit_transform(X) X_rep = normalize(X_rep, norm="l2", axis=1) rep_block = X_rep[:5, :5].toarray() print(f"[DEBUG] Prime 5×5 celle TFIDF:\n{np.round(rep_block, 4)}") # PCA o SVD if sparse.issparse(X_rep): svd = TruncatedSVD(n_components=n_pcs, random_state=42) embedding = svd.fit_transform(X_rep) else: pca = PCA(n_components=n_pcs, random_state=42) embedding = pca.fit_transform(X_rep) scaler = StandardScaler(with_mean=True, with_std=True) adata.uns[f"{variant_key}_X"] = scaler.fit_transform(np.asarray(embedding, dtype=np.float32)) # =============================== # LOGNORM # =============================== elif rep == "lognorm": print("[INFO] Uso rappresentazione log1p-normalized") lib = np.asarray(X.sum(axis=1)).ravel() lib[lib == 0] = 1.0 X_norm = X.multiply(1.0 / lib[:, None]) X_norm.data = np.log1p(X_norm.data) rep_block = X_norm[:5, :5].toarray() print(f"[DEBUG] Prime 5×5 celle LOGNORM:\n{np.round(rep_block, 4)}") if sparse.issparse(X_norm): svd = TruncatedSVD(n_components=n_pcs, random_state=42) embedding = svd.fit_transform(X_norm) else: pca = PCA(n_components=n_pcs, random_state=42) embedding = pca.fit_transform(X_norm) scaler = StandardScaler(with_mean=True, with_std=True) adata.uns[f"{variant_key}_X"] = scaler.fit_transform(np.asarray(embedding, dtype=np.float32)) # =============================== # ERRORE SE MODALITÀ NON VALIDA # =============================== else: raise ValueError(f"[ERROR] variant_rep '{variant_rep}' non riconosciuto. Usa 'muon', 'binary', 'tfidf' o 'lognorm'.") # --- UMAP QC (solo se non già fatto) --- if f"{variant_key}_umap" not in adata.obsm: reducer = umap.UMAP(n_neighbors=10, min_dist=0.05, random_state=42) emb2d = reducer.fit_transform(adata.uns[f"{variant_key}_X"]) adata.obsm[f"{variant_key}_umap"] = emb2d print(f"[INFO] DNA PCA shape: {adata.uns[f'{variant_key}_X'].shape}") print(f"[INFO] === VARIANT ANALYSIS DONE ({X.shape[0]} cells, {X.shape[1]} variants) ===") return adata def calcOmicsClusters(adata, omic_key, res=0.5, n_neighbors=None, n_pcs=None): """ Compute clustering (Leiden) on a given omic embedding (trans, variant, int). Automatically adapts to 2D UMAPs or high-dimensional embeddings. """ import scanpy as sc import numpy as np # Default n_neighbors if n_neighbors is None: n_neighbors = 15 # --- Detect representation type --- use_rep = omic_key + "_umap" if use_rep in adata.obsm: # Caso 2D UMAP print(f"[INFO] Using 2D UMAP embedding: {use_rep}") sc.pp.neighbors( adata, n_neighbors=n_neighbors, use_rep=use_rep, key_added=f"{omic_key}_neighbors" ) elif omic_key + "_X" in adata.uns: # Caso embedding ad alta dimensionalità salvato in uns X = adata.uns.get(omic_key + "_X") if X is None or not isinstance(X, (np.ndarray, list)): raise ValueError(f"adata.uns['{omic_key}_X'] non valido o vuoto.") adata.obsm[omic_key + "_X"] = np.asarray(X, dtype=np.float32) dims = adata.obsm[omic_key + "_X"].shape[1] print(f"[INFO] Using high-dimensional embedding: {omic_key}_X ({dims} dims)") sc.pp.neighbors( adata, n_neighbors=n_neighbors, n_pcs=min(n_pcs or dims, dims), use_rep=omic_key + "_X", key_added=f"{omic_key}_neighbors" ) else: raise ValueError(f"No valid representation found for omic_key='{omic_key}'") # --- Leiden clustering --- sc.tl.leiden( adata, resolution=res, key_added=f"{omic_key}_clust_{res}", neighbors_key=f"{omic_key}_neighbors" ) print(f"[INFO] Leiden clustering completato su {omic_key} con res={res}") return adata def weightsInit(m): if isinstance(m, torch.nn.Linear): torch.nn.init.xavier_uniform(m.weight.data) m.bias.data.zero_() def omicsIntegration( adata, transcript_key="trans", variant_key="variant", integration_key="int", latent_dim=32, num_epoch=300, lam_align=0.5, lam_recon_a=1.0, lam_recon_b=1.0, lam_cross=2.0, seed=42, res=None, # 👈 opzionale, se specificato calcola Leiden ): """ Integrazione RNA+VAR: - Se n_cells < 2000 → PCA concatenata (Muon-style) - Se n_cells >= 2000 → Autoencoder asimmetrico (RNA=teacher, VAR=student) Se res è specificato, calcola il Leiden integrato a quella risoluzione. """ import numpy as np import scanpy as sc print("[INFO] === OMICS INTEGRATION START ===") # --- verifica input --- assert transcript_key + "_X" in adata.uns, f"Missing adata.uns['{transcript_key}_X']" assert variant_key + "_X" in adata.uns, f"Missing adata.uns['{variant_key}_X']" X_a = np.asarray(adata.uns[transcript_key + "_X"], dtype=np.float32) X_b = np.asarray(adata.uns[variant_key + "_X"], dtype=np.float32) assert X_a.shape[0] == X_b.shape[0], "RNA/VAR rows (cells) must match" n_cells = X_a.shape[0] # --- bilancia varianze --- X_a /= np.std(X_a) X_b /= np.std(X_b) # ========================================================== # === BLOCCO 1: PCA CONCATENATA (dataset piccolo < 2000) === # ========================================================== if n_cells < 2000: print(f"[INFO] n_cells={n_cells} < 2000 → uso PCA concatenata Muon-style") from sklearn.decomposition import PCA # --- usa PCA già esistenti se presenti --- if f"{transcript_key}_pca" in adata.obsm and f"{variant_key}_pca" in adata.obsm: print(f"[INFO] Uso PCA pre-esistenti da adata.obsm['{transcript_key}_pca'] e adata.obsm['{variant_key}_pca']") pca_rna = adata.obsm[f"{transcript_key}_pca"] pca_var = adata.obsm[f"{variant_key}_pca"] else: print(f"[WARN] PCA non trovate in adata.obsm, le ricomputo localmente.") n_comp = min(50, X_a.shape[1], X_b.shape[1]) pca_rna = PCA(n_components=n_comp, random_state=seed).fit_transform(X_a) pca_var = PCA(n_components=n_comp, random_state=seed).fit_transform(X_b) adata.obsm[f"{transcript_key}_pca"] = pca_rna adata.obsm[f"{variant_key}_pca"] = pca_var # --- controlla coerenza --- if pca_rna.shape[0] != pca_var.shape[0]: raise ValueError(f"PCA RNA e VAR hanno dimensioni diverse: {pca_rna.shape[0]} vs {pca_var.shape[0]}") # --- concatenazione PCA --- X_concat = np.concatenate([pca_rna, pca_var], axis=1) adata.uns[integration_key + "_X"] = X_concat.copy() adata.obsm["X_concat_pca"] = X_concat.copy() print(f"[INFO] Concatenazione PCA completata: {X_concat.shape}") concat_block = X_concat[:5, :5] print(f"[DEBUG] Prime 5×5 celle CONCATENATE:\n{np.round(concat_block, 4)}") print(f"[DEBUG] Somma prime 5 righe concatenata: {[round(X_concat[i,:].sum(), 4) for i in range(5)]}") print(f"[DEBUG] Media varianza PCA RNA vs VAR: {pca_rna.var():.3f} / {pca_var.var():.3f}") # --- Neighbors + UMAP integrato --- sc.pp.neighbors(adata, use_rep="X_concat_pca", key_added=f"{integration_key}_neighbors") sc.tl.umap(adata) adata.obsm[f"{integration_key}_umap"] = adata.obsm["X_umap"].copy() # --- Leiden opzionale --- if res is not None: key = f"{integration_key}_clust_{res:.2f}".rstrip("0").rstrip(".") print(f"[INFO] Calcolo Leiden integrato per res={res}") sc.tl.leiden( adata, resolution=res, key_added=key, neighbors_key=f"{integration_key}_neighbors", flavor="igraph", n_iterations=2, directed=False, random_state=seed, ) n_clusters = adata.obs[key].nunique() adata.obs[key] = adata.obs[key].astype("category") adata.uns[key] = {"resolution": res, "n_clusters": n_clusters} print(f"[INFO] → Creato {key} ({n_clusters} cluster)") print("[INFO] === MUON-STYLE INTEGRATION COMPLETATA ===") return adata # ======================================================== # === BLOCCO 2: AUTOENCODER ASIMMETRICO (dataset grande) === # ======================================================== else: print(f"[INFO] n_cells={n_cells} ≥ 2000 → uso autoencoder asimmetrico (RNA→VAR)") from sklearn.preprocessing import StandardScaler import umap _, zA, zB = train_paired_ae( X_a, X_b, latent_dim=latent_dim, num_epoch=num_epoch, lam_align=lam_align, lam_recon_a=lam_recon_a, lam_recon_b=lam_recon_b, lam_cross=lam_cross, verbose=True, ) simAE = float(np.mean(np.sum(zA * zB, axis=1))) zAE = 0.5 * (zA + zB) adata.uns[integration_key + "_X"] = zAE.astype(np.float32) adata.uns[integration_key + "_metrics"] = { "simAE": simAE, "latent_dim": int(latent_dim), "num_epoch": int(num_epoch), "lam_align": float(lam_align), "lam_recon_b": float(lam_recon_b), "lam_cross": float(lam_cross), } um = umap.UMAP(n_neighbors=15, min_dist=0.05, random_state=seed).fit_transform(zAE) adata.obsm[f"{integration_key}_umap"] = um # --- Leiden opzionale --- if res is not None: key = f"{integration_key}_clust_{res:.2f}".rstrip("0").rstrip(".") print(f"[INFO] Calcolo Leiden integrato per res={res}") sc.pp.neighbors(adata, use_rep=f"{integration_key}_X", key_added=f"{integration_key}_neighbors") sc.tl.leiden( adata, resolution=res, key_added=key, neighbors_key=f"{integration_key}_neighbors", flavor="igraph", n_iterations=2, directed=False, random_state=seed, ) n_clusters = adata.obs[key].nunique() adata.obs[key] = adata.obs[key].astype("category") adata.uns[key] = {"resolution": res, "n_clusters": n_clusters} print(f"[INFO] → Creato {key} ({n_clusters} cluster)") print(f"[INT] AE similarity={simAE:.3f}") print("[INFO] === AUTOENCODER INTEGRATION COMPLETATA ===") return adata class pairedIntegration(torch.nn.Module): def __init__(self,input_dim_a=2000,input_dim_b=2000,clf_out=10): super(pairedIntegration, self).__init__() self.input_dim_a = input_dim_a self.input_dim_b = input_dim_b self.clf_out = clf_out self.encoder_a = torch.nn.Sequential( torch.nn.Linear(self.input_dim_a, 1000), torch.nn.BatchNorm1d(1000), torch.nn.ReLU(), torch.nn.Linear(1000, 512), torch.nn.BatchNorm1d(512), torch.nn.ReLU(), torch.nn.Linear(512, 128), torch.nn.BatchNorm1d(128), torch.nn.ReLU()) self.encoder_b = torch.nn.Sequential( torch.nn.Linear(self.input_dim_b, 1000), torch.nn.BatchNorm1d(1000), torch.nn.ReLU(), torch.nn.Linear(1000, 512), torch.nn.BatchNorm1d(512), torch.nn.ReLU(), torch.nn.Linear(512, 128), torch.nn.BatchNorm1d(128), torch.nn.ReLU()) self.clf = torch.nn.Sequential( torch.nn.Linear(128, self.clf_out), torch.nn.Softmax(dim=1)) self.feature = torch.nn.Sequential( torch.nn.Linear(128, 32)) def forward(self, x_a,x_b): out_a = self.encoder_a(x_a) f_a = self.feature(out_a) y_a = self.clf(out_a) out_b = self.encoder_b(x_b) f_b = self.feature(out_b) y_b = self.clf(out_b) return f_a,y_a,f_b,y_b def pairedIntegrationTrainer(X_a, X_b, model, batch_size = 512, num_epoch=5, f_temp = 0.1, p_temp = 1.0): device = torch.device("cpu") f_con = contrastiveLoss(batch_size = batch_size,temperature = f_temp) p_con = contrastiveLoss(batch_size = model.clf_out,temperature = p_temp) opt = torch.optim.SGD(model.parameters(),lr=0.01, momentum=0.9,weight_decay=5e-4) for k in range(num_epoch): model.to(device) n = X_a.shape[0] r = np.random.permutation(n) X_train_a = X_a[r,:] X_tensor_A=torch.tensor(X_train_a).float() X_train_b = X_b[r,:] X_tensor_B=torch.tensor(X_train_b).float() losses = 0 for j in range(n//batch_size): inputs_a = X_tensor_A[j*batch_size:(j+1)*batch_size,:].to(device) inputs_a2 = inputs_a + torch.normal(0,1,inputs_a.shape).to(device) inputs_a = inputs_a + torch.normal(0,1,inputs_a.shape).to(device) inputs_b = X_tensor_B[j*batch_size:(j+1)*batch_size,:].to(device) inputs_b = inputs_b + torch.normal(0,1,inputs_b.shape).to(device) feas,o,nfeas,no = model(inputs_a,inputs_b) feas2,o2,_,_ = model(inputs_a2,inputs_b) fea_mi = f_con(feas,nfeas)+f_con(feas,feas2) p_mi = p_con(o.T,no.T)+p_con(o.T,o2.T) loss = fea_mi + p_mi opt.zero_grad() loss.backward() opt.step() losses += loss.data.tolist() print("Total loss: "+str(round(losses,4))) gc.collect() class contrastiveLoss(torch.nn.Module): def __init__(self, batch_size, temperature=0.5): super().__init__() self.batch_size = batch_size self.register_buffer("temperature", torch.tensor(temperature)) self.register_buffer("negatives_mask", (~torch.eye(batch_size * 2, batch_size * 2, dtype=bool)).float()) def forward(self, emb_i, emb_j): # """ # emb_i and emb_j are batches of embeddings, where corresponding indices are pairs # z_i, z_j as per SimCLR paper # """ z_i = F.normalize(emb_i, dim=1,p=2) z_j = F.normalize(emb_j, dim=1,p=2) representations = torch.cat([z_i, z_j], dim=0) similarity_matrix = F.cosine_similarity(representations.unsqueeze(1), representations.unsqueeze(0), dim=2) sim_ij = torch.diag(similarity_matrix, self.batch_size) sim_ji = torch.diag(similarity_matrix, -self.batch_size) positives = torch.cat([sim_ij, sim_ji], dim=0) nominator = torch.exp(positives / self.temperature) denominator = self.negatives_mask * torch.exp(similarity_matrix / self.temperature) loss_partial = -torch.log(nominator / torch.sum(denominator, dim=1)) loss = torch.sum(loss_partial) / (2 * self.batch_size) return loss def distributionClusters(adata, control_cl, group_cl, perc_cell_to_show=0.1, figsize = (12,8), dpi=100, save_path=None): df = adata.obs.groupby([group_cl, control_cl]).size().unstack() df = df.loc[df.sum(axis=1)/df.values.sum()>=perc_cell_to_show,:] # remove row if group_cluster represents less cells in perc than perc_cell_to_show. df_rel = df df = df.div(df.sum(axis=1), axis=0) df[group_cl] = df.index plt.rcParams["figure.figsize"] = figsize plt.rcParams['figure.dpi'] = dpi plt.rcParams['figure.facecolor'] = '#FFFFFF' df.plot( x = group_cl, kind = 'barh', stacked = True, mark_right = True, ) leg = plt.legend(bbox_to_anchor=(1, 1), loc="upper left") leg.get_frame().set_edgecolor('black') plt.xlabel('perc_'+control_cl, loc='center') for n in df_rel: for i, (pos_x, ab, value) in enumerate(zip(df.iloc[:, :-1].cumsum(1)[n], df[n], df_rel[n])): if (value == 0) | (ab <=0.05): value = '' plt.text(pos_x-ab/2, i, str(value), va='center', ha='center') plt.grid(False) if save_path is not None: plt.savefig(save_path, bbox_inches='tight') return(plt.show()) # ================================================================ # 🔧 OUTPUT HANDLER: salva UMAP e AnnData in /output/ # ================================================================ import os import matplotlib.pyplot as plt import scanpy as sc def save_all_umaps(adata, prefix="output", color_by=None, dpi=300): """ Salva tutte le UMAP presenti in adata.obsm come immagini PNG. Non salva l'AnnData, nessun h5ad viene scritto. """ import os import scanpy as sc import matplotlib.pyplot as plt os.makedirs(prefix, exist_ok=True) print(f"[INFO] Cartella di output: {prefix}") # === Trova tutte le UMAP disponibili === umap_keys = [k for k in adata.obsm.keys() if k.endswith("_umap") or k == "X_umap"] if not umap_keys: print("[WARN] Nessuna UMAP trovata in adata.obsm") return print(f"[INFO] UMAP trovate: {umap_keys}") # === Determina cosa usare come colore === if color_by is None: cluster_cols = [c for c in adata.obs.columns if "clust" in c.lower()] color_by = cluster_cols if cluster_cols else ["n_genes"] elif isinstance(color_by, str): color_by = [color_by] print(f"[INFO] Colorazioni da usare: {color_by}") # === Salva ogni combinazione UMAP × colore === for key in umap_keys: for color in color_by: fig_path = os.path.join(prefix, f"{key}_{color}.png") try: sc.pl.embedding( adata, basis=key, color=color, frameon=False, show=False, ) plt.savefig(fig_path, dpi=dpi, bbox_inches="tight") plt.close() print(f"[OK] Salvata {fig_path}") except Exception as e: print(f"[WARN] Errore nel salvare {fig_path}: {e}") print("[✅] Tutte le UMAP salvate con successo.")