Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
custom
scVAR
Commits
2c405146
Commit
2c405146
authored
Nov 25, 2025
by
Ivan Merelli
Browse files
Initial commit
parents
Changes
26
Hide whitespace changes
Inline
Side-by-side
scVAR/__pycache__/scVAR.cpython-310.pyc
0 → 100644
View file @
2c405146
File added
scVAR/scVAR.py
0 → 100644
View file @
2c405146
# import packs
import
numpy
as
np
import
pandas
as
pd
import
scanpy
as
sc
import
gc
import
torch
import
torch.nn.functional
as
F
import
umap
import
anndata
from
sklearn.preprocessing
import
StandardScaler
from
sklearn.metrics.cluster
import
adjusted_rand_score
from
sklearn.metrics
import
normalized_mutual_info_score
from
sklearn.feature_extraction.text
import
TfidfTransformer
from
scipy
import
io
from
scipy
import
sparse
import
matplotlib.pyplot
as
plt
### FUNCTION ###
################
def
transcriptomicAnalysis
(
path_10x
,
bcode_variants
,
high_var_genes_min_mean
=
0.015
,
high_var_genes_max_mean
=
3
,
high_var_genes_min_disp
=
0.75
,
min_genes
=
200
,
min_cells
=
3
,
max_pct_mt
=
100
,
n_neighbors
=
20
,
n_pcs
=
30
,
transcript_key
=
'trans'
,
manual_matrix_reading
=
False
,
force_float64
=
True
,
):
"""
Preprocess transcriptomic (RNA) data from 10x Genomics format.
Ora la normalizzazione e lo scaling replicano esattamente la pipeline Muon:
normalize_total → log1p → scale(max_value=10)
"""
import
scanpy
as
sc
import
anndata
import
pandas
as
pd
import
numpy
as
np
import
scipy.sparse
as
sp
from
scipy
import
io
from
sklearn.preprocessing
import
StandardScaler
np
.
set_printoptions
(
precision
=
6
,
suppress
=
False
,
linewidth
=
140
)
print
(
"[INFO] === TRANSCRIPTOMIC ANALYSIS START ==="
)
# === Lettura dati RNA ===
if
manual_matrix_reading
:
rna
=
path_10x
+
'/matrix.mtx.gz'
barcode
=
path_10x
+
'/barcodes.tsv.gz'
feat
=
path_10x
+
'/features.tsv.gz'
M
=
io
.
mmread
(
rna
).
astype
(
np
.
float64
if
force_float64
else
np
.
float32
)
B
=
M
.
T
# (cells x genes)
barcode
=
pd
.
read_csv
(
barcode
,
sep
=
'
\t
'
,
header
=
None
,
names
=
[
'bcode'
])
barcode
.
index
=
barcode
[
'bcode'
]
feat
=
pd
.
read_csv
(
feat
,
sep
=
'
\t
'
,
header
=
None
,
names
=
[
'gene_ids'
,
'gene_symbol'
,
'feature_types'
])
feat
.
index
=
feat
[
'gene_symbol'
]
adata
=
anndata
.
AnnData
(
X
=
B
,
obs
=
barcode
,
var
=
feat
)
adata
.
X
=
sp
.
csr_matrix
(
adata
.
X
)
else
:
adata
=
sc
.
read_10x_mtx
(
path_10x
,
var_names
=
'gene_symbols'
,
cache
=
True
)
adata
.
X
=
adata
.
X
.
astype
(
np
.
float64
if
force_float64
else
np
.
float32
)
# === Salva copia grezza ===
if
not
sp
.
issparse
(
adata
.
X
):
adata
.
X
=
sp
.
csr_matrix
(
adata
.
X
)
adata
.
uns
[
f
"
{
transcript_key
}
_raw"
]
=
adata
.
X
.
copy
()
adata
.
uns
[
f
"
{
transcript_key
}
_raw_obs_names"
]
=
np
.
array
(
adata
.
obs_names
)
adata
.
uns
[
f
"
{
transcript_key
}
_raw_var_names"
]
=
np
.
array
(
adata
.
var_names
)
print
(
f
"[DEBUG] Raw RNA matrix shape:
{
adata
.
X
.
shape
}
"
)
raw_block
=
adata
.
X
[:
5
,
:
5
].
toarray
().
astype
(
float
)
print
(
f
"[DEBUG] Prime 5×5 celle RAW (counts):
\n
{
np
.
array2string
(
raw_block
,
precision
=
5
,
floatmode
=
'maxprec_equal'
)
}
"
)
# === Filtra barcode presenti anche nel file varianti ===
bcode_var
=
pd
.
read_csv
(
bcode_variants
,
sep
=
'
\t
'
,
header
=
None
)[
0
]
adata
=
adata
[
adata
.
obs
.
index
.
isin
(
bcode_var
)].
copy
()
# === Filtraggio e QC ===
sc
.
pp
.
filter_cells
(
adata
,
min_genes
=
min_genes
)
sc
.
pp
.
filter_genes
(
adata
,
min_cells
=
min_cells
)
adata
.
var
[
'mt'
]
=
adata
.
var_names
.
str
.
startswith
(
'MT-'
)
sc
.
pp
.
calculate_qc_metrics
(
adata
,
qc_vars
=
[
'mt'
],
percent_top
=
None
,
log1p
=
False
,
inplace
=
True
)
adata
=
adata
[
adata
.
obs
.
pct_counts_mt
<
max_pct_mt
,
:].
copy
()
# === Normalizzazione Muon-style ===
sc
.
pp
.
normalize_total
(
adata
)
sc
.
pp
.
log1p
(
adata
)
sc
.
pp
.
scale
(
adata
,
max_value
=
10
)
# === Rimozione eventuali NaN/inf ===
if
sp
.
issparse
(
adata
.
X
):
adata
.
X
.
data
[
np
.
isnan
(
adata
.
X
.
data
)]
=
0
adata
.
X
.
data
[
np
.
isinf
(
adata
.
X
.
data
)]
=
0
else
:
adata
.
X
=
np
.
nan_to_num
(
adata
.
X
,
nan
=
0.0
,
posinf
=
0.0
,
neginf
=
0.0
)
print
(
f
"[DEBUG] RNA normalizzata e scalata (Muon-style). Shape:
{
adata
.
X
.
shape
}
"
)
norm_block
=
adata
.
X
[:
5
,
:
5
].
toarray
().
astype
(
float
)
if
sp
.
issparse
(
adata
.
X
)
else
adata
.
X
[:
5
,
:
5
].
astype
(
float
)
print
(
f
"[DEBUG] Prime 5×5 celle RNA (normalizzate):
\n
{
np
.
array2string
(
norm_block
,
precision
=
4
,
floatmode
=
'maxprec_equal'
)
}
"
)
# === PCA opzionale ===
if
n_pcs
is
not
None
:
print
(
f
"[INFO] PCA con
{
n_pcs
}
componenti..."
)
sc
.
tl
.
pca
(
adata
,
n_comps
=
n_pcs
,
random_state
=
42
)
sc
.
pp
.
neighbors
(
adata
,
n_neighbors
=
n_neighbors
,
n_pcs
=
n_pcs
,
random_state
=
42
)
sc
.
tl
.
umap
(
adata
,
random_state
=
42
)
# Salva embedding PCA e UMAP
adata
.
obsm
[
f
"
{
transcript_key
}
_pca"
]
=
adata
.
obsm
[
"X_pca"
].
copy
()
adata
.
obsm
[
f
"
{
transcript_key
}
_umap"
]
=
adata
.
obsm
[
"X_umap"
].
copy
()
# Salva anche in uns (per integrazione successiva)
adata
.
uns
[
f
"
{
transcript_key
}
_X"
]
=
adata
.
obsm
[
"X_pca"
].
copy
()
# Debug numerico
print
(
f
"[INFO] Salvato adata.uns['
{
transcript_key
}
_X'] con shape
{
adata
.
uns
[
f
'
{
transcript_key
}
_X
'].shape
}
"
)
print
(
f
"[DEBUG] Prime 5×5 celle PCA:
\n
{
np
.
round
(
adata
.
uns
[
f
'
{
transcript_key
}
_X
'][:5, :5], 4)
}
"
)
print
(
f
"[DEBUG] PCA variance ratio (prime 5):
{
np
.
round
(
adata
.
uns
[
'pca'
][
'variance_ratio'
][:
5
],
4
)
}
"
)
else
:
print
(
"[INFO] PCA disabilitata → uso matrice normalizzata intera."
)
adata
.
uns
[
f
"
{
transcript_key
}
_X"
]
=
adata
.
X
.
toarray
()
if
sp
.
issparse
(
adata
.
X
)
else
adata
.
X
print
(
f
"[INFO] === TRANSCRIPTOMIC ANALYSIS DONE (
{
adata
.
n_obs
}
cells,
{
adata
.
n_vars
}
genes) ==="
)
return
adata
def
_prep_embedding_for_training
(
X
):
"""
Z-score per feature, poi L2 per cella. Restituisce float32 numpy.
"""
import
numpy
as
np
from
sklearn.preprocessing
import
StandardScaler
,
normalize
X
=
np
.
asarray
(
X
,
dtype
=
np
.
float32
)
X
=
StandardScaler
(
with_mean
=
True
,
with_std
=
True
).
fit_transform
(
X
)
X
=
normalize
(
X
,
norm
=
"l2"
,
axis
=
1
)
return
X
class
PairedAE
(
torch
.
nn
.
Module
):
"""
Due encoder (RNA/VAR) e due decoder con bottleneck condiviso (latent_dim).
Loss: ricostruzione + allineamento (cosine) tra z_rna e z_var.
"""
def
__init__
(
self
,
dim_a
,
dim_b
,
latent_dim
=
32
,
hidden
=
256
):
super
().
__init__
()
self
.
encoder_a
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
dim_a
,
hidden
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
hidden
,
latent_dim
),
)
self
.
encoder_b
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
dim_b
,
hidden
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
hidden
,
latent_dim
),
)
self
.
decoder_a
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
latent_dim
,
hidden
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
hidden
,
dim_a
),
)
self
.
decoder_b
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
latent_dim
,
hidden
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
hidden
,
dim_b
),
)
def
forward
(
self
,
xa
,
xb
):
za
=
self
.
encoder_a
(
xa
)
zb
=
self
.
encoder_b
(
xb
)
ra
=
self
.
decoder_a
(
za
)
rb
=
self
.
decoder_b
(
zb
)
return
za
,
zb
,
ra
,
rb
def
train_paired_ae
(
X_a
,
X_b
,
latent_dim
=
32
,
num_epoch
=
300
,
lr
=
1e-3
,
lam_align
=
0.5
,
lam_recon_a
=
1.0
,
lam_recon_b
=
1.0
,
lam_cross
=
2.0
,
use_self_attention
=
True
,
verbose
=
True
,
):
import
torch
import
torch.nn
as
nn
import
torch.optim
as
optim
from
torch.utils.data
import
DataLoader
,
TensorDataset
device
=
torch
.
device
(
"cuda"
if
torch
.
cuda
.
is_available
()
else
"cpu"
)
# === Encoder/Decoder ===
def
make_encoder
(
in_dim
,
latent_dim
):
return
nn
.
Sequential
(
nn
.
Linear
(
in_dim
,
256
),
nn
.
BatchNorm1d
(
256
),
nn
.
LeakyReLU
(
0.1
),
nn
.
Dropout
(
0.2
),
nn
.
Linear
(
256
,
latent_dim
),
nn
.
BatchNorm1d
(
latent_dim
),
)
def
make_decoder
(
latent_dim
,
out_dim
):
return
nn
.
Sequential
(
nn
.
Linear
(
latent_dim
,
256
),
nn
.
BatchNorm1d
(
256
),
nn
.
LeakyReLU
(
0.1
),
nn
.
Linear
(
256
,
out_dim
),
)
in_dim_a
,
in_dim_b
=
X_a
.
shape
[
1
],
X_b
.
shape
[
1
]
enc_a
,
dec_a
=
make_encoder
(
in_dim_a
,
latent_dim
).
to
(
device
),
make_decoder
(
latent_dim
,
in_dim_a
).
to
(
device
)
enc_b
,
dec_b
=
make_encoder
(
in_dim_b
,
latent_dim
).
to
(
device
),
make_decoder
(
latent_dim
,
in_dim_b
).
to
(
device
)
# === Fusion & optional self-attention ===
fusion
=
nn
.
Sequential
(
nn
.
Linear
(
latent_dim
,
latent_dim
),
nn
.
LayerNorm
(
latent_dim
),
nn
.
LeakyReLU
(
0.1
),
).
to
(
device
)
if
use_self_attention
:
self_attn
=
nn
.
MultiheadAttention
(
embed_dim
=
latent_dim
,
num_heads
=
4
,
dropout
=
0.1
,
batch_first
=
True
).
to
(
device
)
else
:
self_attn
=
None
opt
=
optim
.
Adam
(
list
(
enc_a
.
parameters
())
+
list
(
dec_a
.
parameters
())
+
list
(
enc_b
.
parameters
())
+
list
(
dec_b
.
parameters
())
+
list
(
fusion
.
parameters
())
+
([]
if
self_attn
is
None
else
list
(
self_attn
.
parameters
())),
lr
=
lr
,
)
loss_fn
=
nn
.
MSELoss
()
Xa
,
Xb
=
torch
.
tensor
(
X_a
,
dtype
=
torch
.
float32
).
to
(
device
),
torch
.
tensor
(
X_b
,
dtype
=
torch
.
float32
).
to
(
device
)
loader
=
DataLoader
(
TensorDataset
(
Xa
,
Xb
),
batch_size
=
128
,
shuffle
=
True
)
best_loss
,
patience
,
patience_counter
=
float
(
"inf"
),
15
,
0
for
epoch
in
range
(
num_epoch
):
enc_a
.
train
(),
dec_a
.
train
(),
enc_b
.
train
(),
dec_b
.
train
(),
fusion
.
train
()
if
self_attn
is
not
None
:
self_attn
.
train
()
total_loss
=
0.0
for
batch_a
,
batch_b
in
loader
:
za
,
zb
=
enc_a
(
batch_a
),
enc_b
(
batch_b
)
# --- Top-k soft-attention + gating (cross-domain fusion) ---
za_n
=
torch
.
nn
.
functional
.
normalize
(
za
,
dim
=
1
)
zb_n
=
torch
.
nn
.
functional
.
normalize
(
zb
,
dim
=
1
)
sim
=
torch
.
matmul
(
za_n
,
zb_n
.
T
)
# (B,B) similarity RNA–VAR
k
=
min
(
10
,
sim
.
size
(
1
))
# top-k local neighborhood
vals
,
idx
=
torch
.
topk
(
sim
,
k
=
k
,
dim
=
1
)
mask
=
torch
.
full_like
(
sim
,
float
(
'-inf'
))
mask
.
scatter_
(
1
,
idx
,
vals
)
attn
=
torch
.
softmax
(
mask
/
0.8
,
dim
=
1
)
# temperatura più morbida
zb_agg
=
torch
.
matmul
(
attn
,
zb
)
# weighted local fusion
# gating adattivo tra RNA-latent e VAR-aggregato
gate
=
torch
.
sigmoid
(((
za
*
zb_agg
).
sum
(
dim
=
1
,
keepdim
=
True
))
/
(
latent_dim
**
0.5
))
zf
=
fusion
(
gate
*
za
+
(
1.0
-
gate
)
*
zb_agg
)
# fusione bilanciata
# --- Self-attention residua opzionale ---
if
self_attn
is
not
None
:
zf_sa
,
_
=
self_attn
(
zf
.
unsqueeze
(
0
),
zf
.
unsqueeze
(
0
),
zf
.
unsqueeze
(
0
))
zf
=
zf
+
0.3
*
zf_sa
.
squeeze
(
0
)
# residuo pesato
# --- Reconstruction losses ---
ra
,
rb
=
dec_a
(
zf
),
dec_b
(
zf
)
loss_recon_a
,
loss_recon_b
=
loss_fn
(
ra
,
batch_a
),
loss_fn
(
rb
,
batch_b
)
rcross_a
,
rcross_b
=
dec_a
(
zb
),
dec_b
(
za
)
loss_cross
=
0.5
*
(
loss_fn
(
rcross_a
,
batch_a
)
+
loss_fn
(
rcross_b
,
batch_b
))
# --- Alignment (MSE + contrastive soft) ---
loss_align_mse
=
loss_fn
(
za
,
zb
)
+
0.5
*
(
loss_fn
(
za
,
zf
)
+
loss_fn
(
zb
,
zf
))
za_sim
=
torch
.
nn
.
functional
.
normalize
(
za
,
dim
=
1
)
zb_sim
=
torch
.
nn
.
functional
.
normalize
(
zb
,
dim
=
1
)
sim_contrast
=
torch
.
mm
(
za_sim
,
zb_sim
.
T
)
/
0.5
# temperatura meno rigida
labels
=
torch
.
arange
(
sim_contrast
.
size
(
0
)).
to
(
device
)
loss_align_contrast
=
nn
.
CrossEntropyLoss
()(
sim_contrast
,
labels
)
loss_align
=
loss_align_mse
+
0.4
*
loss_align_contrast
# contrastive rinforzato
# --- Varianza e allineamento direzionale ---
var_loss
=
torch
.
var
(
zf
,
dim
=
0
).
mean
()
cos_sim
=
torch
.
nn
.
functional
.
cosine_similarity
(
za
,
zb
,
dim
=
1
).
mean
()
# --- Total loss ---
loss
=
(
lam_recon_a
*
loss_recon_a
+
lam_recon_b
*
loss_recon_b
+
lam_cross
*
loss_cross
+
lam_align
*
loss_align
+
0.001
*
var_loss
-
0.05
*
cos_sim
)
opt
.
zero_grad
()
loss
.
backward
()
opt
.
step
()
total_loss
+=
loss
.
item
()
total_loss
/=
len
(
loader
)
if
verbose
and
epoch
%
10
==
0
:
print
(
f
"[EPOCH
{
epoch
:
03
d
}
] loss=
{
total_loss
:.
5
f
}
"
)
# --- Early stopping ---
if
total_loss
+
1e-6
<
best_loss
:
best_loss
,
patience_counter
=
total_loss
,
0
else
:
patience_counter
+=
1
if
patience_counter
>
patience
:
if
verbose
:
print
(
f
"[EARLY STOP] epoch=
{
epoch
}
, loss=
{
total_loss
:.
5
f
}
"
)
break
# === Inference ===
enc_a
.
eval
(),
enc_b
.
eval
(),
fusion
.
eval
()
if
self_attn
is
not
None
:
self_attn
.
eval
()
with
torch
.
no_grad
():
zA
=
fusion
(
enc_a
(
Xa
))
zB
=
fusion
(
enc_b
(
Xb
))
if
self_attn
is
not
None
:
zA
=
zA
+
0.3
*
self_attn
(
zA
.
unsqueeze
(
0
),
zA
.
unsqueeze
(
0
),
zA
.
unsqueeze
(
0
))[
0
].
squeeze
(
0
)
zB
=
zB
+
0.3
*
self_attn
(
zB
.
unsqueeze
(
0
),
zB
.
unsqueeze
(
0
),
zB
.
unsqueeze
(
0
))[
0
].
squeeze
(
0
)
zA
,
zB
=
zA
.
cpu
().
numpy
(),
zB
.
cpu
().
numpy
()
return
None
,
zA
,
zB
def
variantAnalysis
(
adata
,
matrix_path
=
None
,
bcode_path
=
None
,
variants_path
=
None
,
min_cells
=
5
,
max_cell_fraction
=
0.95
,
variant_filter_level
=
"norm"
,
n_pcs
=
50
,
variant_rep
=
"muon"
,
# "muon", "tfidf", "lognorm"
variant_key
=
"variant"
,
):
"""
Preprocess variant (DNA) data — simmetrico con transcriptomicsAnalysis.
Salva:
- adata.uns['variant_raw'] matrice grezza
- adata.uns['variant_X'] embedding numerico (PCA/scaled)
- adata.obsm['variant_pca'] PCA (identico a uns['variant_X'])
- adata.obsm['variant_umap'] embedding 2D per QC
"""
import
scanpy
as
sc
import
numpy
as
np
import
pandas
as
pd
from
scipy
import
sparse
,
io
from
sklearn.decomposition
import
PCA
,
TruncatedSVD
from
sklearn.preprocessing
import
normalize
from
sklearn.feature_extraction.text
import
TfidfTransformer
import
umap
print
(
"[INFO] === VARIANT ANALYSIS START ==="
)
print
(
f
"[INFO] Lettura file:
{
matrix_path
}
"
)
# --- Lettura matrice ---
var_mtx
=
io
.
mmread
(
matrix_path
)
X
=
sparse
.
csr_matrix
(
var_mtx
)
barcodes
=
pd
.
read_csv
(
bcode_path
,
sep
=
"
\t
"
,
header
=
None
)[
0
].
astype
(
str
).
values
variants
=
pd
.
read_csv
(
variants_path
,
sep
=
"
\t
"
,
header
=
None
)[
0
].
astype
(
str
).
values
# --- Fix orientamento ---
if
X
.
shape
[
0
]
==
len
(
variants
)
and
X
.
shape
[
1
]
==
len
(
barcodes
):
print
(
f
"[WARN] Transposing variant matrix
{
X
.
shape
}
→ expected (cells × variants)"
)
X
=
X
.
T
elif
X
.
shape
[
1
]
!=
len
(
variants
):
print
(
f
"[WARN] Dimensioni inconsuete
{
X
.
shape
}
, provo a trasporre per sicurezza"
)
X
=
X
.
T
print
(
f
"[INFO] Matrice varianti caricata →
{
X
.
shape
[
0
]
}
celle ×
{
X
.
shape
[
1
]
}
varianti"
)
# === Debug blocco RAW ===
raw_block
=
X
[:
5
,
:
5
].
toarray
()
print
(
f
"[DEBUG] Prime 5×5 celle RAW (counts):
\n
{
np
.
round
(
raw_block
,
2
)
}
"
)
# --- Allineamento con RNA ---
rna_barcodes
=
np
.
array
([
b
.
split
(
'-'
)[
0
]
for
b
in
adata
.
obs_names
])
var_barcodes
=
np
.
array
([
b
.
split
(
'-'
)[
0
]
for
b
in
barcodes
])
common
=
np
.
intersect1d
(
var_barcodes
,
rna_barcodes
)
if
len
(
common
)
==
0
:
raise
ValueError
(
"[ERROR] Nessun barcode comune tra RNA e VAR."
)
order_idx
=
np
.
array
([
np
.
where
(
var_barcodes
==
b
)[
0
][
0
]
for
b
in
rna_barcodes
if
b
in
var_barcodes
])
X
=
X
[
order_idx
,
:]
barcodes
=
barcodes
[
order_idx
]
print
(
f
"[INFO] Celle comuni con RNA:
{
len
(
barcodes
)
}
"
)
# --- Salva GREZZI ---
adata
.
uns
[
f
"
{
variant_key
}
_raw"
]
=
X
.
copy
()
adata
.
uns
[
f
"
{
variant_key
}
_raw_obs_names"
]
=
barcodes
adata
.
uns
[
f
"
{
variant_key
}
_raw_var_names"
]
=
variants
# === Selezione modalità di rappresentazione ===
rep
=
variant_rep
.
lower
().
strip
()
print
(
f
"[INFO] Rappresentazione scelta:
{
rep
.
upper
()
}
"
)
# ===============================
# MUON / BINARY (default)
# ===============================
if
rep
in
[
"muon"
,
"binary"
]:
print
(
"[INFO] Uso pipeline Muon-style (binarizzazione + scaling + PCA + UMAP)"
)
X_bin
=
(
X
>
0
).
astype
(
float
)
dna
=
sc
.
AnnData
(
X_bin
)
dna
.
obs_names
=
barcodes
dna
.
var_names
=
variants
# Filtra varianti troppo rare o troppo comuni
sc
.
pp
.
filter_genes
(
dna
,
min_cells
=
min_cells
)
freq
=
np
.
array
(
dna
.
X
.
sum
(
axis
=
0
)).
flatten
()
/
dna
.
n_obs
keep
=
freq
<
max_cell_fraction
dna
=
dna
[:,
keep
].
copy
()
print
(
f
"[INFO] Varianti mantenute:
{
dna
.
n_vars
}
/
{
len
(
variants
)
}
"
)
# Scala
sc
.
pp
.
scale
(
dna
,
max_value
=
10
)
norm_block
=
dna
.
X
[:
5
,
:
5
].
toarray
()
if
sparse
.
issparse
(
dna
.
X
)
else
dna
.
X
[:
5
,
:
5
]
print
(
f
"[DEBUG] Prime 5×5 celle DNA (scalate):
\n
{
np
.
round
(
norm_block
,
4
)
}
"
)
# PCA e UMAP
sc
.
tl
.
pca
(
dna
,
n_comps
=
n_pcs
,
random_state
=
42
)
sc
.
pp
.
neighbors
(
dna
,
n_pcs
=
min
(
30
,
n_pcs
),
random_state
=
42
)
sc
.
tl
.
umap
(
dna
,
random_state
=
42
)
adata
.
obsm
[
f
"
{
variant_key
}
_pca"
]
=
dna
.
obsm
[
"X_pca"
].
copy
()
adata
.
uns
[
f
"
{
variant_key
}
_X"
]
=
dna
.
obsm
[
"X_pca"
].
copy
()
adata
.
obsm
[
f
"
{
variant_key
}
_umap"
]
=
dna
.
obsm
[
"X_umap"
].
copy
()
print
(
f
"[DEBUG] PCA variance ratio (prime 5):
{
np
.
round
(
dna
.
uns
[
'pca'
][
'variance_ratio'
][:
5
],
4
)
}
"
)
print
(
f
"[DEBUG] Prime 5×5 celle PCA:
\n
{
np
.
round
(
adata
.
uns
[
f
'
{
variant_key
}
_X
'][:5, :5], 4)
}
"
)
# ===============================
# TF-IDF
# ===============================
elif
rep
==
"tfidf"
:
print
(
"[INFO] Uso rappresentazione TF-IDF + L2 norm"
)
tfidf
=
TfidfTransformer
(
norm
=
None
,
use_idf
=
True
,
smooth_idf
=
True
)
X_rep
=
tfidf
.
fit_transform
(
X
)
X_rep
=
normalize
(
X_rep
,
norm
=
"l2"
,
axis
=
1
)
rep_block
=
X_rep
[:
5
,
:
5
].
toarray
()
print
(
f
"[DEBUG] Prime 5×5 celle TFIDF:
\n
{
np
.
round
(
rep_block
,
4
)
}
"
)
svd
=
TruncatedSVD
(
n_components
=
n_pcs
,
random_state
=
42
)
embedding
=
svd
.
fit_transform
(
X_rep
)
adata
.
uns
[
f
"
{
variant_key
}
_X"
]
=
embedding
.
astype
(
np
.
float32
)
adata
.
obsm
[
f
"
{
variant_key
}
_pca"
]
=
embedding
.
astype
(
np
.
float32
)
reducer
=
umap
.
UMAP
(
n_neighbors
=
10
,
min_dist
=
0.05
,
random_state
=
42
)
adata
.
obsm
[
f
"
{
variant_key
}
_umap"
]
=
reducer
.
fit_transform
(
embedding
)
# ===============================
# LOGNORM
# ===============================
elif
rep
==
"lognorm"
:
print
(
"[INFO] Uso rappresentazione log1p-normalized"
)
lib
=
np
.
asarray
(
X
.
sum
(
axis
=
1
)).
ravel
()
lib
[
lib
==
0
]
=
1.0
X_norm
=
X
.
multiply
(
1.0
/
lib
[:,
None
])
X_norm
.
data
=
np
.
log1p
(
X_norm
.
data
)
rep_block
=
X_norm
[:
5
,
:
5
].
toarray
()
print
(
f
"[DEBUG] Prime 5×5 celle LOGNORM:
\n
{
np
.
round
(
rep_block
,
4
)
}
"
)
svd
=
TruncatedSVD
(
n_components
=
n_pcs
,
random_state
=
42
)
embedding
=
svd
.
fit_transform
(
X_norm
)
adata
.
uns
[
f
"
{
variant_key
}
_X"
]
=
embedding
.
astype
(
np
.
float32
)
adata
.
obsm
[
f
"
{
variant_key
}
_pca"
]
=
embedding
.
astype
(
np
.
float32
)
reducer
=
umap
.
UMAP
(
n_neighbors
=
10
,
min_dist
=
0.05
,
random_state
=
42
)
adata
.
obsm
[
f
"
{
variant_key
}
_umap"
]
=
reducer
.
fit_transform
(
embedding
)
# ===============================
# ERRORE
# ===============================
else
:
raise
ValueError
(
f
"[ERROR] variant_rep '
{
variant_rep
}
' non riconosciuto. Usa 'muon', 'tfidf' o 'lognorm'."
)
print
(
f
"[INFO] DNA PCA shape:
{
adata
.
uns
[
f
'
{
variant_key
}
_X
'].shape
}
"
)
print
(
f
"[INFO] === VARIANT ANALYSIS DONE (
{
X
.
shape
[
0
]
}
cells,
{
X
.
shape
[
1
]
}
variants) ==="
)
return
adata
def
calcOmicsClusters
(
adata
,
omic_key
=
"int"
,
res
=
0.5
,
n_neighbors
=
15
,
n_pcs
=
None
):
"""
Calcola il clustering Leiden sull'embedding integrato (es. 'int').
Logica:
- se n_cells < 1000 → usa PCA concatenata salvata in adata.uns['int_X'] (Muon-style)
- se n_cells >= 1000 → usa embedding AE salvato in adata.uns['int_X']
Nessun uso di UMAP. Clustering deterministico (igraph).
"""
import
scanpy
as
sc
import
numpy
as
np
n_cells
=
adata
.
n_obs
print
(
f
"
\n
[INFO] === CALC OMiCS CLUSTERS (
{
omic_key
}
) ==="
)
print
(
f
"[INFO] Numero celle:
{
n_cells
}
"
)
# ==========================================================
# 1️⃣ Recupera embedding integrato (PCA concatenata o AE)
# ==========================================================
if
f
"
{
omic_key
}
_X"
not
in
adata
.
uns
:
raise
ValueError
(
f
"[ERROR] uns['
{
omic_key
}
_X'] mancante — devi prima eseguire omicsIntegration()."
)
X
=
np
.
asarray
(
adata
.
uns
[
f
"
{
omic_key
}
_X"
],
dtype
=
np
.
float32
)
adata
.
obsm
[
f
"
{
omic_key
}
_X"
]
=
X
.
copy
()
if
n_cells
<
1000
:
print
(
"[INFO] Dataset piccolo → uso PCA concatenata (Muon-style)"
)
else
:
print
(
"[INFO] Dataset grande → uso embedding autoencoder (AE-style)"
)
print
(
f
"[INFO] Embedding caricato da adata.uns['
{
omic_key
}
_X'] (
{
X
.
shape
[
1
]
}
dimensioni)"
)
# 🔍 Debug numerico
debug_block
=
np
.
round
(
X
[:
5
,
:
5
],
4
)
print
(
f
"[DEBUG] Prime 5×5 celle embedding:
\n
{
debug_block
}
"
)
print
(
f
"[DEBUG] Somma prime 5 righe:
{
[
round
(
X
[
i
,:].
sum
(),
4
)
for
i
in
range
(
min
(
5
,
X
.
shape
[
0
]))]
}
"
)
print
(
f
"[DEBUG] Varianza media embedding:
{
X
.
var
():.
4
f
}
"
)
# (Opzionale) verifica coerenza con obsm["X_concat_pca"]
if
"X_concat_pca"
in
adata
.
obsm
:
same
=
np
.
allclose
(
X
[:
5
,
:
5
],
adata
.
obsm
[
"X_concat_pca"
][:
5
,
:
5
])
print
(
f
"[DEBUG] Coerenza con obsm['X_concat_pca']:
{
same
}
"
)
# ==========================================================
# 2️⃣ Costruisci neighbors
# ==========================================================
dims
=
X
.
shape
[
1
]
sc
.
pp
.
neighbors
(
adata
,
n_neighbors
=
n_neighbors
,
n_pcs
=
min
(
n_pcs
or
dims
,
dims
),
use_rep
=
f
"
{
omic_key
}
_X"
,
key_added
=
f
"
{
omic_key
}
_neighbors"
)
conn
=
adata
.
obsp
[
f
"
{
omic_key
}
_neighbors_connectivities"
]
dist
=
adata
.
obsp
[
f
"
{
omic_key
}
_neighbors_distances"
]
print
(
f
"[DEBUG] Neighbors graph:
{
conn
.
shape
}
, mean_conn=
{
conn
.
mean
():.
6
f
}
, mean_dist=
{
dist
.
mean
():.
6
f
}
"
)
# ==========================================================
# 3️⃣ Leiden deterministico (igraph)
# ==========================================================
key
=
f
"
{
omic_key
}
_clust_
{
res
:.
2
f
}
"
.
rstrip
(
"0"
).
rstrip
(
"."
)
sc
.
tl
.
leiden
(
adata
,
resolution
=
res
,
key_added
=
key
,
flavor
=
"igraph"
,
n_iterations
=
2
,
directed
=
False
,
random_state
=
0
,
# deterministico
neighbors_key
=
f
"
{
omic_key
}
_neighbors"
,
)
n_clusters
=
adata
.
obs
[
key
].
nunique
()
adata
.
obs
[
key
]
=
adata
.
obs
[
key
].
astype
(
"category"
)
adata
.
uns
[
key
]
=
{
"resolution"
:
res
,
"n_clusters"
:
n_clusters
}
print
(
f
"[INFO] → Leiden completato su
{
omic_key
}
(res=
{
res
}
) →
{
n_clusters
}
cluster"
)
print
(
"[INFO] === CLUSTERING DONE ==="
)
return
adata
def
weightsInit
(
m
):
if
isinstance
(
m
,
torch
.
nn
.
Linear
):
torch
.
nn
.
init
.
xavier_uniform
(
m
.
weight
.
data
)
m
.
bias
.
data
.
zero_
()
def
omicsIntegration
(
adata
,
transcript_key
=
"trans"
,
variant_key
=
"variant"
,
integration_key
=
"int"
,
latent_dim
=
32
,
num_epoch
=
300
,
lam_align
=
2.0
,
lam_recon_a
=
1.0
,
lam_recon_b
=
1.0
,
lam_cross
=
5.0
,
seed
=
42
,
res
=
None
,
balance_var
=
False
,
):
"""
Integrazione RNA+VAR:
- Se n_cells < 1000 → PCA concatenata (Muon-style, senza riscalatura)
- Se n_cells >= 1000 → Autoencoder asimmetrico (RNA=teacher, VAR=student)
Se res è specificato, calcola Leiden integrato a quella risoluzione.
"""
import
numpy
as
np
import
scanpy
as
sc
print
(
"[INFO] === OMICS INTEGRATION START ==="
)
# --- verifica input ---
assert
transcript_key
+
"_X"
in
adata
.
uns
,
f
"Missing adata.uns['
{
transcript_key
}
_X']"
assert
variant_key
+
"_X"
in
adata
.
uns
,
f
"Missing adata.uns['
{
variant_key
}
_X']"
X_a
=
np
.
asarray
(
adata
.
uns
[
transcript_key
+
"_X"
],
dtype
=
np
.
float32
)
X_b
=
np
.
asarray
(
adata
.
uns
[
variant_key
+
"_X"
],
dtype
=
np
.
float32
)
assert
X_a
.
shape
[
0
]
==
X_b
.
shape
[
0
],
"RNA/VAR rows (cells) must match"
n_cells
=
X_a
.
shape
[
0
]
# --- bilancia varianze (solo se richiesto) ---
if
balance_var
:
print
(
"[INFO] Bilanciamento varianze attivo (X /= std)."
)
X_a
/=
np
.
std
(
X_a
)
X_b
/=
np
.
std
(
X_b
)
else
:
print
(
"[INFO] Nessuna riscalatura: uso PCA pura (Muon-style)."
)
# ==========================================================
# === BLOCCO 1: PCA CONCATENATA (dataset piccolo < 1000) ===
# ==========================================================
if
n_cells
<
1000
:
print
(
f
"[INFO] n_cells=
{
n_cells
}
< 1000 → uso PCA concatenata Muon-style"
)
from
sklearn.decomposition
import
PCA
# --- Usa PCA già salvate in adata.uns se disponibili ---
if
f
"
{
transcript_key
}
_X"
in
adata
.
uns
and
f
"
{
variant_key
}
_X"
in
adata
.
uns
:
print
(
f
"[INFO] Uso PCA pre-esistenti da adata.uns['
{
transcript_key
}
_X'] e adata.uns['
{
variant_key
}
_X']"
)
pca_rna
=
np
.
asarray
(
adata
.
uns
[
f
"
{
transcript_key
}
_X"
])
pca_var
=
np
.
asarray
(
adata
.
uns
[
f
"
{
variant_key
}
_X"
])
# === Debug approfondito ===
print
(
f
"[DEBUG] PCA RNA shape:
{
pca_rna
.
shape
}
| PCA VAR shape:
{
pca_var
.
shape
}
"
)
print
(
f
"[DEBUG] Prime 5×5 PCA RNA:
\n
{
np
.
round
(
pca_rna
[:
5
,
:
5
],
4
)
}
"
)
print
(
f
"[DEBUG] Prime 5×5 PCA VAR:
\n
{
np
.
round
(
pca_var
[:
5
,
:
5
],
4
)
}
"
)
print
(
f
"[DEBUG] Media/varianza PCA RNA:
{
np
.
mean
(
pca_rna
):.
4
f
}
/
{
np
.
var
(
pca_rna
):.
4
f
}
"
)
print
(
f
"[DEBUG] Media/varianza PCA VAR:
{
np
.
mean
(
pca_var
):.
4
f
}
/
{
np
.
var
(
pca_var
):.
4
f
}
"
)
else
:
print
(
"[WARN] PCA non trovate in adata.uns — le ricomputo localmente da X grezze."
)
from
sklearn.decomposition
import
PCA
X_a
=
np
.
asarray
(
adata
.
uns
[
f
"
{
transcript_key
}
_raw"
].
todense
()
if
hasattr
(
adata
.
uns
[
f
"
{
transcript_key
}
_raw"
],
"todense"
)
else
adata
.
uns
[
f
"
{
transcript_key
}
_raw"
])
X_b
=
np
.
asarray
(
adata
.
uns
[
f
"
{
variant_key
}
_raw"
].
todense
()
if
hasattr
(
adata
.
uns
[
f
"
{
variant_key
}
_raw"
],
"todense"
)
else
adata
.
uns
[
f
"
{
variant_key
}
_raw"
])
n_comp
=
min
(
50
,
X_a
.
shape
[
1
],
X_b
.
shape
[
1
])
pca_rna
=
PCA
(
n_components
=
n_comp
,
random_state
=
seed
).
fit_transform
(
X_a
)
pca_var
=
PCA
(
n_components
=
n_comp
,
random_state
=
seed
).
fit_transform
(
X_b
)
adata
.
uns
[
f
"
{
transcript_key
}
_X"
]
=
pca_rna
adata
.
uns
[
f
"
{
variant_key
}
_X"
]
=
pca_var
print
(
f
"[INFO] Create nuove PCA locali: RNA=
{
pca_rna
.
shape
}
, VAR=
{
pca_var
.
shape
}
"
)
print
(
f
"[DEBUG] Prime 5×5 PCA RNA (nuova):
\n
{
np
.
round
(
pca_rna
[:
5
,
:
5
],
4
)
}
"
)
print
(
f
"[DEBUG] Prime 5×5 PCA VAR (nuova):
\n
{
np
.
round
(
pca_var
[:
5
,
:
5
],
4
)
}
"
)
# --- concatenazione PCA pura ---
X_concat
=
np
.
concatenate
([
pca_rna
,
pca_var
],
axis
=
1
)
adata
.
uns
[
integration_key
+
"_X"
]
=
X_concat
.
copy
()
adata
.
obsm
[
"X_concat_pca"
]
=
X_concat
.
copy
()
print
(
f
"[INFO] Concatenazione PCA completata:
{
X_concat
.
shape
}
"
)
concat_block
=
X_concat
[:
5
,
:
5
]
print
(
f
"[DEBUG] Prime 5×5 celle CONCATENATE:
\n
{
np
.
round
(
concat_block
,
4
)
}
"
)
print
(
f
"[DEBUG] Somma prime 5 righe concatenata:
{
[
round
(
X_concat
[
i
,:].
sum
(),
4
)
for
i
in
range
(
5
)]
}
"
)
print
(
f
"[DEBUG] Varianza PCA RNA / VAR:
{
pca_rna
.
var
():.
3
f
}
/
{
pca_var
.
var
():.
3
f
}
"
)
# --- Neighbors + UMAP integrato ---
sc
.
pp
.
neighbors
(
adata
,
use_rep
=
"X_concat_pca"
,
key_added
=
f
"
{
integration_key
}
_neighbors"
)
sc
.
tl
.
umap
(
adata
)
adata
.
obsm
[
f
"
{
integration_key
}
_umap"
]
=
adata
.
obsm
[
"X_umap"
].
copy
()
# --- Leiden opzionale ---
if
res
is
not
None
:
key
=
f
"
{
integration_key
}
_clust_
{
res
:.
2
f
}
"
.
rstrip
(
"0"
).
rstrip
(
"."
)
print
(
f
"[INFO] Calcolo Leiden integrato per res=
{
res
}
"
)
sc
.
tl
.
leiden
(
adata
,
resolution
=
res
,
key_added
=
key
,
neighbors_key
=
f
"
{
integration_key
}
_neighbors"
,
flavor
=
"igraph"
,
n_iterations
=
2
,
directed
=
False
,
random_state
=
seed
,
)
n_clusters
=
adata
.
obs
[
key
].
nunique
()
adata
.
obs
[
key
]
=
adata
.
obs
[
key
].
astype
(
"category"
)
adata
.
uns
[
key
]
=
{
"resolution"
:
res
,
"n_clusters"
:
n_clusters
}
print
(
f
"[INFO] → Creato
{
key
}
(
{
n_clusters
}
cluster)"
)
print
(
"[INFO] === MUON-STYLE INTEGRATION COMPLETATA ==="
)
return
adata
# ========================================================
# === BLOCCO 2: AUTOENCODER ASIMMETRICO (dataset grande) ===
# ========================================================
else
:
print
(
f
"[INFO] n_cells=
{
n_cells
}
1000 → uso Autoencoder asimmetrico (RNA=teacher, VAR=student)"
)
from
sklearn.preprocessing
import
StandardScaler
import
umap
# --- Normalizzazione separata per modalità ---
scaler_a
=
StandardScaler
(
with_mean
=
True
,
with_std
=
True
)
scaler_b
=
StandardScaler
(
with_mean
=
True
,
with_std
=
True
)
#X_a_scaled = scaler_a.fit_transform(X_a)
#X_b_scaled = scaler_b.fit_transform(X_b)
from
sklearn.preprocessing
import
StandardScaler
# Normalizzazione combinata (RNA+VAR insieme)
scaler
=
StandardScaler
(
with_mean
=
True
,
with_std
=
True
)
X_concat
=
np
.
concatenate
([
X_a
,
X_b
],
axis
=
1
)
X_concat
=
scaler
.
fit_transform
(
X_concat
)
# separa di nuovo RNA e VAR normalizzati sulla stessa scala
X_a_scaled
=
X_concat
[:,
:
X_a
.
shape
[
1
]]
X_b_scaled
=
X_concat
[:,
X_a
.
shape
[
1
]:]
# --- Addestramento AE migliorato ---
_
,
zA
,
zB
=
train_paired_ae
(
X_a_scaled
,
X_b_scaled
,
latent_dim
=
latent_dim
,
num_epoch
=
num_epoch
,
lam_align
=
lam_align
,
lam_recon_a
=
lam_recon_a
,
lam_recon_b
=
lam_recon_b
,
lam_cross
=
lam_cross
,
verbose
=
True
,
)
# --- Diagnostica dettagliata dell'embedding ---
import
numpy
as
np
print
(
"
\n
[DEBUG] === AE DIAGNOSTIC ==="
)
print
(
f
"Shape zA:
{
zA
.
shape
}
, zB:
{
zB
.
shape
}
"
)
print
(
f
"Varianza zA:
{
np
.
mean
(
np
.
var
(
zA
,
axis
=
0
)):.
6
f
}
"
)
print
(
f
"Varianza zB:
{
np
.
mean
(
np
.
var
(
zB
,
axis
=
0
)):.
6
f
}
"
)
print
(
f
"Covarianza media zA/zB:
{
np
.
mean
(
np
.
cov
(
np
.
concatenate
([
zA
,
zB
],
axis
=
1
).
T
)):.
6
f
}
"
)
print
(
f
"SimAE (dot):
{
float
(
np
.
mean
(
np
.
sum
(
zA
*
zB
,
axis
=
1
))):.
6
f
}
"
)
# Verifica correlazioni tra feature latenti
corr
=
np
.
corrcoef
(
zA
.
T
)
print
(
f
"Media correlazione assoluta tra feature zA:
{
np
.
mean
(
np
.
abs
(
corr
[
np
.
triu_indices_from
(
corr
,
1
)])):.
4
f
}
"
)
# Range valori
print
(
f
"Range valori zA:
{
zA
.
min
():.
4
f
}
→
{
zA
.
max
():.
4
f
}
"
)
print
(
f
"Range valori zB:
{
zB
.
min
():.
4
f
}
→
{
zB
.
max
():.
4
f
}
"
)
print
(
"[DEBUG] Prime 5x5 celle zA:
\n
"
,
np
.
round
(
zA
[:
5
,
:
5
],
4
))
print
(
"[DEBUG] Prime 5x5 celle zB:
\n
"
,
np
.
round
(
zB
[:
5
,
:
5
],
4
))
print
(
"================================
\n
"
)
simAE
=
float
(
np
.
mean
(
np
.
sum
(
zA
*
zB
,
axis
=
1
)))
zAE
=
0.5
*
(
zA
+
zB
)
print
(
f
"[DEBUG] Varianza media zA:
{
zA
.
var
():.
4
f
}
, zB:
{
zB
.
var
():.
4
f
}
, zAE:
{
zAE
.
var
():.
4
f
}
"
)
print
(
f
"[DEBUG] Prime 5×5 celle embedding AE:
\n
{
np
.
round
(
zAE
[:
5
,
:
5
],
4
)
}
"
)
if
zAE
.
var
()
<
1e-3
:
print
(
"[WARN] Varianza AE molto bassa — embedding probabilmente collassato"
)
# --- Salva risultato integrato ---
adata
.
uns
[
integration_key
+
"_X"
]
=
zAE
.
astype
(
np
.
float32
)
adata
.
uns
[
integration_key
+
"_metrics"
]
=
{
"simAE"
:
simAE
,
"latent_dim"
:
int
(
latent_dim
),
"num_epoch"
:
int
(
num_epoch
),
"lam_align"
:
float
(
lam_align
),
"lam_recon_a"
:
float
(
lam_recon_a
),
"lam_recon_b"
:
float
(
lam_recon_b
),
"lam_cross"
:
float
(
lam_cross
),
}
# --- UMAP e clustering opzionale ---
um
=
umap
.
UMAP
(
n_neighbors
=
15
,
min_dist
=
0.05
,
random_state
=
seed
).
fit_transform
(
zAE
)
adata
.
obsm
[
f
"
{
integration_key
}
_umap"
]
=
um
if
res
is
not
None
:
key
=
f
"
{
integration_key
}
_clust_
{
res
:.
2
f
}
"
.
rstrip
(
"0"
).
rstrip
(
"."
)
print
(
f
"[INFO] Calcolo Leiden integrato per res=
{
res
}
"
)
sc
.
pp
.
neighbors
(
adata
,
use_rep
=
f
"
{
integration_key
}
_X"
,
key_added
=
f
"
{
integration_key
}
_neighbors"
)
sc
.
tl
.
leiden
(
adata
,
resolution
=
res
,
key_added
=
key
,
neighbors_key
=
f
"
{
integration_key
}
_neighbors"
,
flavor
=
"igraph"
,
n_iterations
=
2
,
directed
=
False
,
random_state
=
seed
,
)
n_clusters
=
adata
.
obs
[
key
].
nunique
()
adata
.
obs
[
key
]
=
adata
.
obs
[
key
].
astype
(
"category"
)
adata
.
uns
[
key
]
=
{
"resolution"
:
res
,
"n_clusters"
:
n_clusters
}
print
(
f
"[INFO] → Creato
{
key
}
(
{
n_clusters
}
cluster)"
)
print
(
f
"[INFO] AE similarity=
{
simAE
:.
3
f
}
"
)
print
(
"[INFO] === AUTOENCODER INTEGRATION COMPLETATA ==="
)
return
adata
class
pairedIntegration
(
torch
.
nn
.
Module
):
def
__init__
(
self
,
input_dim_a
=
100
,
input_dim_b
=
100
,
clf_out
=
10
):
super
(
pairedIntegration
,
self
).
__init__
()
self
.
input_dim_a
=
input_dim_a
self
.
input_dim_b
=
input_dim_b
self
.
clf_out
=
clf_out
self
.
encoder_a
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
self
.
input_dim_a
,
1000
),
torch
.
nn
.
BatchNorm1d
(
1000
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
1000
,
512
),
torch
.
nn
.
BatchNorm1d
(
512
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
512
,
128
),
torch
.
nn
.
BatchNorm1d
(
128
),
torch
.
nn
.
ReLU
())
self
.
encoder_b
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
self
.
input_dim_b
,
1000
),
torch
.
nn
.
BatchNorm1d
(
1000
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
1000
,
512
),
torch
.
nn
.
BatchNorm1d
(
512
),
torch
.
nn
.
ReLU
(),
torch
.
nn
.
Linear
(
512
,
128
),
torch
.
nn
.
BatchNorm1d
(
128
),
torch
.
nn
.
ReLU
())
self
.
clf
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
128
,
self
.
clf_out
),
torch
.
nn
.
Softmax
(
dim
=
1
))
self
.
feature
=
torch
.
nn
.
Sequential
(
torch
.
nn
.
Linear
(
128
,
32
))
def
forward
(
self
,
x_a
,
x_b
):
out_a
=
self
.
encoder_a
(
x_a
)
f_a
=
self
.
feature
(
out_a
)
y_a
=
self
.
clf
(
out_a
)
out_b
=
self
.
encoder_b
(
x_b
)
f_b
=
self
.
feature
(
out_b
)
y_b
=
self
.
clf
(
out_b
)
return
f_a
,
y_a
,
f_b
,
y_b
def
pairedIntegrationTrainer
(
X_a
,
X_b
,
model
,
batch_size
=
512
,
num_epoch
=
5
,
f_temp
=
0.1
,
p_temp
=
1.0
):
device
=
torch
.
device
(
"cpu"
)
f_con
=
contrastiveLoss
(
batch_size
=
batch_size
,
temperature
=
f_temp
)
p_con
=
contrastiveLoss
(
batch_size
=
model
.
clf_out
,
temperature
=
p_temp
)
opt
=
torch
.
optim
.
SGD
(
model
.
parameters
(),
lr
=
0.01
,
momentum
=
0.9
,
weight_decay
=
5e-4
)
for
k
in
range
(
num_epoch
):
model
.
to
(
device
)
n
=
X_a
.
shape
[
0
]
r
=
np
.
random
.
permutation
(
n
)
X_train_a
=
X_a
[
r
,:]
X_tensor_A
=
torch
.
tensor
(
X_train_a
).
float
()
X_train_b
=
X_b
[
r
,:]
X_tensor_B
=
torch
.
tensor
(
X_train_b
).
float
()
losses
=
0
for
j
in
range
(
n
//
batch_size
):
inputs_a
=
X_tensor_A
[
j
*
batch_size
:(
j
+
1
)
*
batch_size
,:].
to
(
device
)
inputs_a2
=
inputs_a
+
torch
.
normal
(
0
,
1
,
inputs_a
.
shape
).
to
(
device
)
inputs_a
=
inputs_a
+
torch
.
normal
(
0
,
1
,
inputs_a
.
shape
).
to
(
device
)
inputs_b
=
X_tensor_B
[
j
*
batch_size
:(
j
+
1
)
*
batch_size
,:].
to
(
device
)
inputs_b
=
inputs_b
+
torch
.
normal
(
0
,
1
,
inputs_b
.
shape
).
to
(
device
)
feas
,
o
,
nfeas
,
no
=
model
(
inputs_a
,
inputs_b
)
feas2
,
o2
,
_
,
_
=
model
(
inputs_a2
,
inputs_b
)
fea_mi
=
f_con
(
feas
,
nfeas
)
+
f_con
(
feas
,
feas2
)
p_mi
=
p_con
(
o
.
T
,
no
.
T
)
+
p_con
(
o
.
T
,
o2
.
T
)
loss
=
fea_mi
+
p_mi
opt
.
zero_grad
()
loss
.
backward
()
opt
.
step
()
losses
+=
loss
.
data
.
tolist
()
print
(
"Total loss: "
+
str
(
round
(
losses
,
4
)))
gc
.
collect
()
class
contrastiveLoss
(
torch
.
nn
.
Module
):
def
__init__
(
self
,
batch_size
,
temperature
=
0.5
):
super
().
__init__
()
self
.
batch_size
=
batch_size
self
.
register_buffer
(
"temperature"
,
torch
.
tensor
(
temperature
))
self
.
register_buffer
(
"negatives_mask"
,
(
~
torch
.
eye
(
batch_size
*
2
,
batch_size
*
2
,
dtype
=
bool
)).
float
())
def
forward
(
self
,
emb_i
,
emb_j
):
# """
# emb_i and emb_j are batches of embeddings, where corresponding indices are pairs
# z_i, z_j as per SimCLR paper
# """
z_i
=
F
.
normalize
(
emb_i
,
dim
=
1
,
p
=
2
)
z_j
=
F
.
normalize
(
emb_j
,
dim
=
1
,
p
=
2
)
representations
=
torch
.
cat
([
z_i
,
z_j
],
dim
=
0
)
similarity_matrix
=
F
.
cosine_similarity
(
representations
.
unsqueeze
(
1
),
representations
.
unsqueeze
(
0
),
dim
=
2
)
sim_ij
=
torch
.
diag
(
similarity_matrix
,
self
.
batch_size
)
sim_ji
=
torch
.
diag
(
similarity_matrix
,
-
self
.
batch_size
)
positives
=
torch
.
cat
([
sim_ij
,
sim_ji
],
dim
=
0
)
nominator
=
torch
.
exp
(
positives
/
self
.
temperature
)
denominator
=
self
.
negatives_mask
*
torch
.
exp
(
similarity_matrix
/
self
.
temperature
)
loss_partial
=
-
torch
.
log
(
nominator
/
torch
.
sum
(
denominator
,
dim
=
1
))
loss
=
torch
.
sum
(
loss_partial
)
/
(
2
*
self
.
batch_size
)
return
loss
def
distributionClusters
(
adata
,
control_cl
,
group_cl
,
perc_cell_to_show
=
0.1
,
figsize
=
(
12
,
8
),
dpi
=
100
,
save_path
=
None
):
df
=
adata
.
obs
.
groupby
([
group_cl
,
control_cl
]).
size
().
unstack
()
df
=
df
.
loc
[
df
.
sum
(
axis
=
1
)
/
df
.
values
.
sum
()
>=
perc_cell_to_show
,:]
# remove row if group_cluster represents less cells in perc than perc_cell_to_show.
df_rel
=
df
df
=
df
.
div
(
df
.
sum
(
axis
=
1
),
axis
=
0
)
df
[
group_cl
]
=
df
.
index
plt
.
rcParams
[
"figure.figsize"
]
=
figsize
plt
.
rcParams
[
'figure.dpi'
]
=
dpi
plt
.
rcParams
[
'figure.facecolor'
]
=
'#FFFFFF'
df
.
plot
(
x
=
group_cl
,
kind
=
'barh'
,
stacked
=
True
,
mark_right
=
True
,
)
leg
=
plt
.
legend
(
bbox_to_anchor
=
(
1
,
1
),
loc
=
"upper left"
)
leg
.
get_frame
().
set_edgecolor
(
'black'
)
plt
.
xlabel
(
'perc_'
+
control_cl
,
loc
=
'center'
)
for
n
in
df_rel
:
for
i
,
(
pos_x
,
ab
,
value
)
in
enumerate
(
zip
(
df
.
iloc
[:,
:
-
1
].
cumsum
(
1
)[
n
],
df
[
n
],
df_rel
[
n
])):
if
(
value
==
0
)
|
(
ab
<=
0.05
):
value
=
''
plt
.
text
(
pos_x
-
ab
/
2
,
i
,
str
(
value
),
va
=
'center'
,
ha
=
'center'
)
plt
.
grid
(
False
)
if
save_path
is
not
None
:
plt
.
savefig
(
save_path
,
bbox_inches
=
'tight'
)
return
(
plt
.
show
())
# ================================================================
# 🔧 OUTPUT HANDLER: salva UMAP e AnnData in /output/
# ================================================================
import
os
import
matplotlib.pyplot
as
plt
import
scanpy
as
sc
def
save_all_umaps
(
adata
,
prefix
=
"output"
,
color_by
=
None
,
dpi
=
300
):
"""
Salva tutte le UMAP presenti in adata.obsm come immagini PNG.
Non salva l'AnnData, nessun h5ad viene scritto.
"""
import
os
import
scanpy
as
sc
import
matplotlib.pyplot
as
plt
os
.
makedirs
(
prefix
,
exist_ok
=
True
)
print
(
f
"[INFO] Cartella di output:
{
prefix
}
"
)
# === Trova tutte le UMAP disponibili ===
umap_keys
=
[
k
for
k
in
adata
.
obsm
.
keys
()
if
k
.
endswith
(
"_umap"
)
or
k
==
"X_umap"
]
if
not
umap_keys
:
print
(
"[WARN] Nessuna UMAP trovata in adata.obsm"
)
return
print
(
f
"[INFO] UMAP trovate:
{
umap_keys
}
"
)
# === Determina cosa usare come colore ===
if
color_by
is
None
:
cluster_cols
=
[
c
for
c
in
adata
.
obs
.
columns
if
"clust"
in
c
.
lower
()]
color_by
=
cluster_cols
if
cluster_cols
else
[
"n_genes"
]
elif
isinstance
(
color_by
,
str
):
color_by
=
[
color_by
]
print
(
f
"[INFO] Colorazioni da usare:
{
color_by
}
"
)
# === Salva ogni combinazione UMAP Ã colore ===
for
key
in
umap_keys
:
for
color
in
color_by
:
fig_path
=
os
.
path
.
join
(
prefix
,
f
"
{
key
}
_
{
color
}
.png"
)
try
:
sc
.
pl
.
embedding
(
adata
,
basis
=
key
,
color
=
color
,
frameon
=
False
,
show
=
False
,
)
plt
.
savefig
(
fig_path
,
dpi
=
dpi
,
bbox_inches
=
"tight"
)
plt
.
close
()
print
(
f
"[OK] Salvata
{
fig_path
}
"
)
except
Exception
as
e
:
print
(
f
"[WARN] Errore nel salvare
{
fig_path
}
:
{
e
}
"
)
print
(
"[✅] Tutte le UMAP salvate con successo."
)
sc_gen/convert_to_scvar_format.py
0 → 100644
View file @
2c405146
####extract data in 10x format for scvar
import
pandas
as
pd
from
scipy
import
io
,
sparse
import
os
def
convert_expression
(
expr_csv
,
out_dir
):
print
(
"🔄 Converting expression data..."
)
expr
=
pd
.
read_csv
(
expr_csv
,
index_col
=
0
)
# Save matrix in Matrix Market format
expr_sparse
=
sparse
.
csr_matrix
(
expr
.
values
)
io
.
mmwrite
(
os
.
path
.
join
(
out_dir
,
"matrix.mtx"
),
expr_sparse
)
# Save barcodes
barcodes
=
pd
.
Series
(
expr
.
columns
)
barcodes
.
to_csv
(
os
.
path
.
join
(
out_dir
,
"clean_barcodes.txt"
),
index
=
False
,
header
=
False
)
# Save features.tsv in 10X format: gene_id\tgene_name\tGene Expression
features
=
pd
.
DataFrame
({
"gene_id"
:
expr
.
index
,
"gene_name"
:
expr
.
index
,
"feature_type"
:
"Gene Expression"
})
features
.
to_csv
(
os
.
path
.
join
(
out_dir
,
"features.tsv"
),
sep
=
"
\t
"
,
index
=
False
,
header
=
False
)
print
(
"✅ Expression conversion complete!"
)
def
convert_variants
(
variant_csv
,
out_dir
):
print
(
"🔄 Converting variant data..."
)
var
=
pd
.
read_csv
(
variant_csv
,
index_col
=
0
)
# Save matrix in Matrix Market format
var_sparse
=
sparse
.
csr_matrix
(
var
.
values
)
io
.
mmwrite
(
os
.
path
.
join
(
out_dir
,
"consensus_filtered_markdup.mtx"
),
var_sparse
)
# Save barcodes
barcodes
=
pd
.
Series
(
var
.
columns
)
barcodes
.
to_csv
(
os
.
path
.
join
(
out_dir
,
"barcodes_var.tsv"
),
index
=
False
,
header
=
False
)
# Save variant list
variants
=
pd
.
Series
(
var
.
index
)
variants
.
to_csv
(
os
.
path
.
join
(
out_dir
,
"variants_filtered_markdup.txt"
),
index
=
False
,
header
=
False
)
print
(
"✅ Variant conversion complete!"
)
def
main
():
import
argparse
parser
=
argparse
.
ArgumentParser
(
description
=
"Convert synthetic scRNA-seq and variant data into scVAR-compatible format."
)
parser
.
add_argument
(
"--expression_csv"
,
required
=
True
,
help
=
"Path to expression.csv"
)
parser
.
add_argument
(
"--variant_csv"
,
required
=
True
,
help
=
"Path to variants.csv"
)
parser
.
add_argument
(
"--output_dir"
,
required
=
True
,
help
=
"Directory to save scVAR-compatible files"
)
args
=
parser
.
parse_args
()
os
.
makedirs
(
args
.
output_dir
,
exist_ok
=
True
)
convert_expression
(
args
.
expression_csv
,
args
.
output_dir
)
convert_variants
(
args
.
variant_csv
,
args
.
output_dir
)
if
__name__
==
"__main__"
:
main
()
sc_gen/generate_synthetic_singlecell.py
0 → 100644
View file @
2c405146
import
argparse
import
numpy
as
np
import
pandas
as
pd
import
random
def
generate_data
(
N
,
F
,
V
,
T
,
G
,
expr_sparsity
,
var_sparsity
,
expr_error
,
var_error
,
label_noise
,
expr_out
,
var_out
,
truth_out
,
cell_metadata_out
):
np
.
random
.
seed
(
42
)
random
.
seed
(
42
)
# Define labels
cell_types
=
[
f
"CellType_
{
i
}
"
for
i
in
range
(
T
)]
genotypes
=
[
f
"Genotype_
{
i
}
"
for
i
in
range
(
G
)]
cells
=
[
f
"Cell_
{
i
}
"
for
i
in
range
(
N
)]
# Assign true labels
true_cell_types
=
np
.
random
.
choice
(
cell_types
,
size
=
N
)
celltype_to_genotypes
=
{
ct
:
sorted
(
random
.
sample
(
genotypes
,
random
.
randint
(
1
,
G
)))
for
ct
in
cell_types
}
true_genotypes
=
[
random
.
choice
(
celltype_to_genotypes
[
ct
])
for
ct
in
true_cell_types
]
# Corrupt labels if needed
noisy_cell_types
=
true_cell_types
.
copy
()
noisy_genotypes
=
true_genotypes
.
copy
()
n_noisy
=
int
(
label_noise
*
N
)
noisy_indices
=
np
.
random
.
choice
(
N
,
n_noisy
,
replace
=
False
)
for
i
in
noisy_indices
:
wrong_ct
=
list
(
set
(
cell_types
)
-
{
true_cell_types
[
i
]})
noisy_cell_types
[
i
]
=
random
.
choice
(
wrong_ct
)
wrong_genos
=
list
(
set
(
genotypes
)
-
set
(
celltype_to_genotypes
[
true_cell_types
[
i
]]))
if
wrong_genos
:
noisy_genotypes
[
i
]
=
random
.
choice
(
wrong_genos
)
# ---------------------------------------------
# Generate reference transcriptome per CellType
# ---------------------------------------------
expr_reference
=
{
ct
:
np
.
random
.
lognormal
(
mean
=
1.0
,
sigma
=
0.5
,
size
=
F
)
for
ct
in
cell_types
}
# Per-gene noise variance for each cell type (gene-specific)
gene_noise_factors
=
np
.
random
.
uniform
(
0.5
,
1.5
,
size
=
F
)
# Generate expression matrix with noise per cell
expr_data
=
np
.
zeros
((
F
,
N
))
# genes x cells
for
i
in
range
(
N
):
ct
=
noisy_cell_types
[
i
]
profile
=
expr_reference
[
ct
]
noise_std
=
expr_error
*
gene_noise_factors
expr_data
[:,
i
]
=
np
.
random
.
normal
(
loc
=
profile
,
scale
=
noise_std
)
# Apply sparsity (dropouts): zero out expr_sparsity% of genes in each cell
for
i
in
range
(
N
):
zero_indices
=
np
.
random
.
choice
(
F
,
size
=
int
(
expr_sparsity
*
F
),
replace
=
False
)
expr_data
[
zero_indices
,
i
]
=
0.0
expr_data
[
np
.
abs
(
expr_data
)
<
1e-10
]
=
0.0
# clean -0
expr_df
=
pd
.
DataFrame
(
expr_data
,
index
=
[
f
"Gene_
{
i
}
"
for
i
in
range
(
F
)],
columns
=
cells
)
expr_df
.
to_csv
(
expr_out
)
# ------------------------------------------
# Generate reference genotype profiles
# ------------------------------------------
variant_reference
=
{
g
:
np
.
random
.
binomial
(
1
,
0.1
,
size
=
V
)
for
g
in
genotypes
}
# Generate variant matrix for each cell from its genotype
var_data
=
np
.
zeros
((
V
,
N
),
dtype
=
int
)
for
i
in
range
(
N
):
g
=
noisy_genotypes
[
i
]
profile
=
variant_reference
[
g
].
copy
()
# Introduce random changes: replace var_error * V entries with 0 or 1
n_changes
=
int
(
var_error
*
V
)
indices_to_modify
=
np
.
random
.
choice
(
V
,
size
=
n_changes
,
replace
=
False
)
profile
[
indices_to_modify
]
=
np
.
random
.
randint
(
0
,
2
,
size
=
n_changes
)
# Apply sparsity (dropouts): zero out var_sparsity% of variants
n_zeros
=
int
(
var_sparsity
*
V
)
zero_indices
=
np
.
random
.
choice
(
V
,
size
=
n_zeros
,
replace
=
False
)
profile
[
zero_indices
]
=
0
var_data
[:,
i
]
=
profile
var_df
=
pd
.
DataFrame
(
var_data
,
index
=
[
f
"Variant_
{
i
}
"
for
i
in
range
(
V
)],
columns
=
cells
)
var_df
.
to_csv
(
var_out
)
# ------------------------------------------
# Save CellType <-> Genotype mapping (truth)
# ------------------------------------------
ct_gt
=
pd
.
DataFrame
(
[(
ct
,
g
)
for
ct
,
gs
in
celltype_to_genotypes
.
items
()
for
g
in
gs
],
columns
=
[
"CellType"
,
"Genotype"
]
)
ct_gt
.
to_csv
(
truth_out
,
index
=
False
)
# ------------------------------------------
# Save per-cell metadata
# ------------------------------------------
metadata_df
=
pd
.
DataFrame
({
"Cell"
:
cells
,
"CellType"
:
noisy_cell_types
,
"Genotype"
:
noisy_genotypes
,
"TrueCellType"
:
true_cell_types
,
"TrueGenotype"
:
true_genotypes
})
metadata_df
.
to_csv
(
cell_metadata_out
,
index
=
False
)
print
(
f
"✔ Expression matrix saved to:
{
expr_out
}
"
)
print
(
f
"✔ Variant matrix saved to:
{
var_out
}
"
)
print
(
f
"✔ CellType–Genotype mapping saved to:
{
truth_out
}
"
)
print
(
f
"✔ Cell metadata (true + noisy labels) saved to:
{
cell_metadata_out
}
"
)
if
__name__
==
"__main__"
:
parser
=
argparse
.
ArgumentParser
(
description
=
"Generate synthetic single-cell data with realistic transcriptome and variant noise."
)
parser
.
add_argument
(
"--cells"
,
type
=
int
,
required
=
True
)
parser
.
add_argument
(
"--genes"
,
type
=
int
,
required
=
True
)
parser
.
add_argument
(
"--variants"
,
type
=
int
,
required
=
True
)
parser
.
add_argument
(
"--celltypes"
,
type
=
int
,
required
=
True
)
parser
.
add_argument
(
"--genotypes"
,
type
=
int
,
required
=
True
)
parser
.
add_argument
(
"--expr_out"
,
type
=
str
,
required
=
True
)
parser
.
add_argument
(
"--var_out"
,
type
=
str
,
required
=
True
)
parser
.
add_argument
(
"--truth_out"
,
type
=
str
,
required
=
True
)
parser
.
add_argument
(
"--cell_metadata_out"
,
type
=
str
,
required
=
True
)
# Sparsity = dropout-like effect
parser
.
add_argument
(
"--expr_sparsity"
,
type
=
float
,
default
=
0.3
,
help
=
"Fraction of genes set to 0 in each cell (simulates dropout)"
)
parser
.
add_argument
(
"--var_sparsity"
,
type
=
float
,
default
=
0.1
,
help
=
"Fraction of variants set to 0 in each cell (simulates missed calls)"
)
# Noise = deviation from reference profile
parser
.
add_argument
(
"--expr_error"
,
type
=
float
,
default
=
0.3
,
help
=
"Amount of expression noise per gene per cell"
)
parser
.
add_argument
(
"--var_error"
,
type
=
float
,
default
=
0.3
,
help
=
"Fraction of variants changed arbitrarily per cell"
)
# Label noise = wrong labels
parser
.
add_argument
(
"--label_noise"
,
type
=
float
,
default
=
0.0
,
help
=
"Fraction of cells with incorrect celltype/genotype labels"
)
args
=
parser
.
parse_args
()
generate_data
(
N
=
args
.
cells
,
F
=
args
.
genes
,
V
=
args
.
variants
,
T
=
args
.
celltypes
,
G
=
args
.
genotypes
,
expr_sparsity
=
args
.
expr_sparsity
,
var_sparsity
=
args
.
var_sparsity
,
expr_error
=
args
.
expr_error
,
var_error
=
args
.
var_error
,
label_noise
=
args
.
label_noise
,
expr_out
=
args
.
expr_out
,
var_out
=
args
.
var_out
,
truth_out
=
args
.
truth_out
,
cell_metadata_out
=
args
.
cell_metadata_out
,
)
setup.py
0 → 100644
View file @
2c405146
import
setuptools
with
open
(
"README.md"
,
"r"
,
encoding
=
"utf-8"
)
as
fh
:
long_description
=
fh
.
read
()
setuptools
.
setup
(
name
=
"scVAR"
,
version
=
"0.0.2"
,
author
=
"Ivan Merelli"
,
author_email
=
"ivan.merelli@itb.cnr.it"
,
description
=
"A tool to integrate genomics and transcriptomics in scRNA-seq data."
,
long_description
=
long_description
,
long_description_content_type
=
"text/markdown"
,
packages
=
setuptools
.
find_packages
(
include
=
[
'scVAR'
,
'scVAR.*'
]),
classifiers
=
[
"Programming Language :: Python :: 3"
,
"License :: OSI Approved :: MIT License"
,
"Operating System :: OS Independent"
,
],
install_requires
=
[
'numpy'
,
'pandas'
,
'scanpy'
,
'torch'
,
'umap'
,
'leidenalg'
,
'igraph'
,
'anndata'
,
'scikit-learn'
,
'scipy'
,
'matplotlib'
],
python_requires
=
'>=3.10'
,
)
tests
0 → 120000
View file @
2c405146
../tests
\ No newline at end of file
Prev
1
2
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment