Browse code

[U] Update to 1.7.4

Giulia Pais authored on 04/10/2022 12:52:48
Showing 39 changed files

... ...
@@ -12,3 +12,4 @@
12 12
 ^Design$
13 13
 ^sample_reports$
14 14
 ^man-roxygen$
15
+^Meta$
... ...
@@ -39,3 +39,5 @@ sample_reports
39 39
 README.Rmd
40 40
 NEWS.Rmd
41 41
 pkgdown
42
+/doc/
43
+/Meta/
... ...
@@ -1,6 +1,6 @@
1 1
 Package: ISAnalytics
2 2
 Title: Analyze gene therapy vector insertion sites data identified from genomics next generation sequencing reads for clonal tracking studies
3
-Version: 1.7.3
3
+Version: 1.7.4
4 4
 Date: 2020-07-03
5 5
 Authors@R: c(
6 6
   person(given = "Andrea",
... ...
@@ -31,7 +31,6 @@ Imports:
31 31
     purrr,
32 32
     rlang,
33 33
     tibble,
34
-    BiocParallel,
35 34
     stringr,
36 35
     fs,
37 36
     lubridate,
... ...
@@ -50,12 +49,11 @@ Imports:
50 49
     shiny,
51 50
     shinyWidgets,
52 51
     datamods,
53
-    bslib,
54
-    doParallel
52
+    bslib
55 53
 Encoding: UTF-8
56 54
 LazyData: false
57 55
 Roxygen: list(markdown = TRUE)
58
-RoxygenNote: 7.2.0
56
+RoxygenNote: 7.2.1
59 57
 Suggests: 
60 58
     testthat,
61 59
     covr,
... ...
@@ -80,8 +78,13 @@ Suggests:
80 78
     eulerr,
81 79
     openxlsx,
82 80
     jsonlite,
83
-    pheatmap,
84
-    progressr
81
+    pheatmap
82
+Enhances:
83
+    BiocParallel,
84
+    progressr,
85
+    future,
86
+    doFuture,
87
+    foreach
85 88
 VignetteBuilder: knitr
86 89
 RdMacros: 
87 90
     lifecycle
... ...
@@ -29,6 +29,7 @@ export(default_meta_agg)
29 29
 export(default_rec_agg_lambdas)
30 30
 export(default_report_path)
31 31
 export(default_stats)
32
+export(enable_progress_bars)
32 33
 export(export_ISA_settings)
33 34
 export(fisher_scatterplot)
34 35
 export(gene_frequency_fisher)
... ...
@@ -88,12 +89,6 @@ import(ggplot2)
88 89
 import(lifecycle)
89 90
 import(shiny)
90 91
 import(shinyWidgets)
91
-importFrom(BiocParallel,MulticoreParam)
92
-importFrom(BiocParallel,SnowParam)
93
-importFrom(BiocParallel,bplapply)
94
-importFrom(BiocParallel,bpok)
95
-importFrom(BiocParallel,bpstop)
96
-importFrom(BiocParallel,bptry)
97 92
 importFrom(Rcapture,closedp.0)
98 93
 importFrom(Rcapture,closedp.bc)
99 94
 importFrom(data.table,"%chin%")
... ...
@@ -2,6 +2,20 @@
2 2
 title: "NEWS"
3 3
 output: github_document
4 4
 ---
5
+# ISAnalytics 1.7.4 (2022-10-04)
6
+
7
+## VISIBLE USER CHANGES
8
+
9
+* Progress bars for long processing functions are now implemented via the 
10
+package `progressr`, added a wrapper function for fast enabling progress bars,
11
+`enable_progress_bars()`
12
+* Introduced logging for issues in `HSC_population_size_estimate()` - 
13
+signals eventual problems in computing estimates and why
14
+
15
+## BUG FIXES AND MINOR CHANGES
16
+
17
+* Fixed minor bugs and typos
18
+
5 19
 # ISAnalytics 1.7.3 (2022-06-17)
6 20
 
7 21
 ## BUG FIXES AND MINOR CHANGES
... ...
@@ -1,6 +1,20 @@
1 1
 NEWS
2 2
 ================
3 3
 
4
+# ISAnalytics 1.7.4 (2022-10-04)
5
+
6
+## VISIBLE USER CHANGES
7
+
8
+-   Progress bars for long processing functions are now implemented via
9
+    the package `progressr`, added a wrapper function for fast enabling
10
+    progress bars, `enable_progress_bars()`
11
+-   Introduced logging for issues in `HSC_population_size_estimate()` -
12
+    signals eventual problems in computing estimates and why
13
+
14
+## BUG FIXES AND MINOR CHANGES
15
+
16
+-   Fixed minor bugs and typos
17
+
4 18
 # ISAnalytics 1.7.3 (2022-06-17)
5 19
 
6 20
 ## BUG FIXES AND MINOR CHANGES
... ...
@@ -1116,6 +1116,7 @@ CIS_grubbs <- function(x,
1116 1116
 #' @param max_workers Maximum number of parallel workers. If `NULL` the
1117 1117
 #' maximum number of workers is calculated automatically.
1118 1118
 #'
1119
+#'
1119 1120
 #' @return A list with results and optionally missing genes info
1120 1121
 #' @export
1121 1122
 #'
... ...
@@ -1153,12 +1154,6 @@ CIS_grubbs_overtime <- function(x,
1153 1154
             c(group, timepoint_col)[!c(group, timepoint_col) %in% colnames(x)]
1154 1155
         ))
1155 1156
     }
1156
-    # Manage workers
1157
-    if (.Platform$OS.type == "windows" & is.null(max_workers)) {
1158
-        max_workers <- BiocParallel::snowWorkers()
1159
-    } else if (.Platform$OS.type != "windows" & is.null(max_workers)) {
1160
-        max_workers <- BiocParallel::multicoreWorkers()
1161
-    }
1162 1157
     stopifnot(is.logical(as_df) || is.character(as_df))
1163 1158
     result_as_df <- if (is.logical(as_df)) {
1164 1159
         if (as_df[1] == TRUE) {
... ...
@@ -1194,7 +1189,9 @@ CIS_grubbs_overtime <- function(x,
1194 1189
         purrr::set_names(keys_g)
1195 1190
 
1196 1191
     # --- Calculate for each group and each tp
1197
-    tp_slice_cis <- function(df, timepoint_col, res_checks, result_as_df) {
1192
+    tp_slice_cis <- function(df, timepoint_col,
1193
+    res_checks, result_as_df,
1194
+    progress) {
1198 1195
         tmp <- df %>%
1199 1196
             dplyr::group_by(dplyr::across(dplyr::all_of(timepoint_col)))
1200 1197
         g_keys <- tmp %>%
... ...
@@ -1210,7 +1207,8 @@ CIS_grubbs_overtime <- function(x,
1210 1207
                 threshold_alpha = res_checks$threshold_alpha,
1211 1208
                 gene_symbol_col = res_checks$gene_symbol_col,
1212 1209
                 gene_strand_col = res_checks$gene_strand_col,
1213
-                chr_col = res_checks$chrom_col, locus_col = res_checks$locus_col,
1210
+                chr_col = res_checks$chrom_col,
1211
+                locus_col = res_checks$locus_col,
1214 1212
                 strand_col = res_checks$strand_col
1215 1213
             ))
1216 1214
         } else {
... ...
@@ -1220,43 +1218,35 @@ CIS_grubbs_overtime <- function(x,
1220 1218
                 threshold_alpha = res_checks$threshold_alpha,
1221 1219
                 gene_symbol_col = res_checks$gene_symbol_col,
1222 1220
                 gene_strand_col = res_checks$gene_strand_col,
1223
-                chr_col = res_checks$chrom_col, locus_col = res_checks$locus_col,
1221
+                chr_col = res_checks$chrom_col,
1222
+                locus_col = res_checks$locus_col,
1224 1223
                 strand_col = res_checks$strand_col
1225 1224
             ) %>% dplyr::mutate(!!timepoint_col := .y))
1226 1225
         }
1226
+        if (!is.null(progress)) {
1227
+            progress()
1228
+        }
1227 1229
         return(res)
1228 1230
     }
1229
-
1230
-    if (.Platform$OS.type == "windows") {
1231
-        p <- BiocParallel::SnowParam(
1232
-            stop.on.error = TRUE,
1233
-            progressbar = getOption("ISAnalytics.verbose", TRUE),
1234
-            tasks = length(split),
1235
-            workers = max_workers
1236
-        )
1237
-    } else {
1238
-        p <- BiocParallel::MulticoreParam(
1239
-            stop.on.error = TRUE,
1240
-            progressbar = getOption("ISAnalytics.verbose", TRUE),
1241
-            tasks = length(split),
1242
-            workers = max_workers
1243
-        )
1244
-    }
1245
-    cis_overtime <- BiocParallel::bplapply(split,
1246
-        FUN = tp_slice_cis,
1247
-        timepoint_col = timepoint_col,
1248
-        res_checks = res_checks,
1249
-        result_as_df = result_as_df,
1250
-        BPPARAM = p
1231
+    cis_overtime <- .execute_map_job(
1232
+        data_list = split,
1233
+        fun_to_apply = tp_slice_cis,
1234
+        fun_args = list(
1235
+            timepoint_col = timepoint_col,
1236
+            result_as_df = result_as_df,
1237
+            res_checks = res_checks
1238
+        ),
1239
+        stop_on_error = TRUE,
1240
+        max_workers = max_workers
1251 1241
     )
1252
-    BiocParallel::bpstop(p)
1253
-
1254 1242
     if (result_as_df == 1) {
1255 1243
         cis_overtime <- purrr::map2_df(
1256
-            cis_overtime, names(cis_overtime),
1244
+            cis_overtime$res, names(cis_overtime$res),
1257 1245
             ~ .x %>%
1258 1246
                 dplyr::mutate(group = .y)
1259 1247
         )
1248
+    } else {
1249
+        cis_overtime <- cis_overtime$res
1260 1250
     }
1261 1251
     result$cis <- cis_overtime
1262 1252
     return(result)
... ...
@@ -437,16 +437,16 @@ import_association_file <- function(path,
437 437
         {
438 438
             if ("checks_env" %in% names(dots) &&
439 439
                 rlang::is_environment(dots$checks_env)) {
440
-              rlang::env_bind(
441
-                dots$checks_env,
442
-                parsing_prob = parsing_problems,
443
-                dates_prob = date_problems,
444
-                col_prob = col_probs,
445
-                crit_nas = crit_nas,
446
-                fs_align = checks,
447
-                iss_stats = import_stats_rep,
448
-                iss_stats_miss = missing_stats_rep
449
-              )
440
+                rlang::env_bind(
441
+                    dots$checks_env,
442
+                    parsing_prob = parsing_problems,
443
+                    dates_prob = date_problems,
444
+                    col_prob = col_probs,
445
+                    crit_nas = crit_nas,
446
+                    fs_align = checks,
447
+                    iss_stats = import_stats_rep,
448
+                    iss_stats_miss = missing_stats_rep
449
+                )
450 450
             }
451 451
             .produce_report("asso_file",
452 452
                 params = list(
... ...
@@ -951,6 +951,17 @@ import_parallel_Vispa2Matrices <- function(association_file,
951 951
         launch_params[["patterns"]] <- patterns
952 952
         launch_params[["matching_opt"]] <- matching_option
953 953
     }
954
+    if ("checks_env" %in% names(dots_args) &&
955
+        is.environment(dots_args$checks_env)) {
956
+        rlang::env_bind(
957
+            dots_args$checks_env,
958
+            launch_params = launch_params,
959
+            set_vars = list(proj_col = proj_col, pool_col = pool_col),
960
+            files_found = files_found,
961
+            files_imp = fimported,
962
+            annot_prob = annotation_problems
963
+        )
964
+    }
954 965
     withCallingHandlers(
955 966
         {
956 967
             .produce_report("matrix_imp",
... ...
@@ -192,8 +192,10 @@
192 192
 .process_af_for_gen <- function(af) {
193 193
     association_file <- if (!is.data.frame(af) && all(af == "default")) {
194 194
         af_sym <- "association_file"
195
-        utils::data(list = af_sym, envir = rlang::current_env(),
196
-                    package = "ISAnalytics")
195
+        utils::data(
196
+            list = af_sym, envir = rlang::current_env(),
197
+            package = "ISAnalytics"
198
+        )
197 199
         rlang::eval_tidy(rlang::sym(af_sym))
198 200
     } else {
199 201
         af
... ...
@@ -227,8 +229,10 @@
227 229
     integration_matrices <- if (!is.data.frame(matrices) &&
228 230
         all(matrices == "default")) {
229 231
         matrices_sym <- "integration_matrices"
230
-        utils::data(list = matrices_sym, envir = rlang::current_env(),
231
-                    package = "ISAnalytics")
232
+        utils::data(
233
+            list = matrices_sym, envir = rlang::current_env(),
234
+            package = "ISAnalytics"
235
+        )
232 236
         rlang::eval_tidy(rlang::sym(matrices_sym))
233 237
     } else {
234 238
         matrices
... ...
@@ -266,7 +270,8 @@
266 270
     # Then separate by pool
267 271
     sep_matrices <- purrr::map(sep_matrices, ~ {
268 272
         tmp <- .x %>%
269
-            dplyr::group_by(dplyr::across(dplyr::all_of(tag_list$vispa_concatenate)))
273
+            dplyr::group_by(dplyr::across(
274
+              dplyr::all_of(tag_list$vispa_concatenate)))
270 275
         pool_names <- tmp %>%
271 276
             dplyr::group_keys() %>%
272 277
             dplyr::pull(dplyr::all_of(tag_list$vispa_concatenate))
... ...
@@ -776,6 +781,115 @@
776 781
     return(result)
777 782
 }
778 783
 
784
+.check_parallel_packages <- function() {
785
+    if (getOption("ISAnalytics.parallel_processing", default = TRUE) == TRUE) {
786
+        required_parallel_pkgs <- list(
787
+            BiocParallel = "BIOC",
788
+            doFuture = "CRAN",
789
+            future = "CRAN",
790
+            foreach = "CRAN"
791
+        )
792
+        pkgs_present <- purrr::map_lgl(
793
+            names(required_parallel_pkgs),
794
+            ~ requireNamespace(.x, quietly = TRUE)
795
+        )
796
+        if (any(pkgs_present == FALSE)) {
797
+            missing_pkgs <- required_parallel_pkgs[!pkgs_present]
798
+            options(ISAnalytics.parallel_processing = FALSE)
799
+            info_msg <- c(.missing_pkg_error(missing_pkgs),
800
+                i = paste(
801
+                    "Packages for parallel computation are not available,",
802
+                    "switching to default sequential computation",
803
+                    "(certain operations might be slower).",
804
+                    "To re-activate the functionality, install the",
805
+                    "packages and set",
806
+                    "`options(ISAnalytics.parallel_processing = TRUE)`"
807
+                )
808
+            )
809
+            rlang::inform(info_msg)
810
+        }
811
+    }
812
+}
813
+
814
+# Establishes whether a map job can be executed in parallel (multi-threading)
815
+# or needs to be executed sequentially and calls appropriate functions.
816
+#
817
+# Functions in "fun_to_apply" should always have at least 2 args:
818
+# - the data
819
+# - a `progress` arg
820
+# The list of args must contain everything needed by the function except for
821
+# the first argument (passed in data_list) and progress (internally managed)
822
+#' @importFrom purrr map map_lgl
823
+.execute_map_job <- function(data_list,
824
+    fun_to_apply,
825
+    fun_args,
826
+    stop_on_error,
827
+    max_workers = NULL) {
828
+    # Set up progressor if package available
829
+    prog <- if (rlang::is_installed("progressr")) {
830
+        progressr::progressor(steps = length(data_list))
831
+    } else {
832
+        NULL
833
+    }
834
+    .check_parallel_packages()
835
+    if (getOption("ISAnalytics.parallel_processing", default = TRUE) == TRUE) {
836
+        # Manage workers
837
+        if (.Platform$OS.type == "windows" & is.null(max_workers)) {
838
+            max_workers <- BiocParallel::snowWorkers()
839
+        } else if (.Platform$OS.type != "windows" & is.null(max_workers)) {
840
+            max_workers <- BiocParallel::multicoreWorkers()
841
+        }
842
+    }
843
+
844
+    if (getOption("ISAnalytics.parallel_processing", default = TRUE) == TRUE &
845
+        max_workers > 1) {
846
+        # Set up parallel workers
847
+        old_be <- doFuture::registerDoFuture()
848
+        old_plan <- future::plan(future::multisession, workers = max_workers)
849
+        on.exit(
850
+            {
851
+                future::plan(old_plan)
852
+                foreach::setDoPar(fun = old_be$fun,
853
+                                  data = old_be$data, info = old_be$info)
854
+            },
855
+            add = TRUE
856
+        )
857
+        p <- BiocParallel::DoparParam(stop.on.error = stop_on_error)
858
+
859
+        # Execute
860
+        arg_list <- append(fun_args, list(
861
+            X = data_list,
862
+            FUN = fun_to_apply,
863
+            BPPARAM = p,
864
+            progress = prog
865
+        ))
866
+        results <- rlang::exec(
867
+            BiocParallel::bplapply,
868
+            !!!arg_list
869
+        )
870
+        return(list(res = results, mode = "par"))
871
+    }
872
+
873
+    # Sequential
874
+    arg_list <- append(
875
+        list(
876
+            .x = data_list,
877
+            .f = fun_to_apply,
878
+            progress = prog
879
+        ),
880
+        fun_args
881
+    )
882
+    if (stop_on_error == TRUE) {
883
+        results <- rlang::exec(purrr::map, !!!arg_list)
884
+        return(list(res = results, mode = "seq"))
885
+    } else {
886
+        results <- rlang::exec(purrr::safely(purrr::map), !!!arg_list)
887
+        errs <- results$error
888
+        results <- results$result
889
+        return(list(res = results, mode = "seq", err = errs))
890
+    }
891
+}
892
+
779 893
 #### ---- Internals for checks on integration matrices----####
780 894
 
781 895
 # Internal helper function for checking mandatory vars presence in x.
... ...
@@ -997,7 +1111,8 @@
997 1111
         (is.list(additional_cols) & !is.null(names(additional_cols))))
998 1112
     correct_types <- additional_cols %in% c(.types_mapping()$types, "_")
999 1113
     if (any(correct_types == FALSE)) {
1000
-        add_types_err <- c("Unknown column type in specified additional columns",
1114
+        add_types_err <- c(
1115
+          "Unknown column type in specified additional columns",
1001 1116
             x = paste("Types must be in the allowed types"),
1002 1117
             i = paste(
1003 1118
                 "Unknown formats: ",
... ...
@@ -1070,7 +1185,10 @@
1070 1185
     if (is_annotated) {
1071 1186
         id_vars <- c(id_vars, annotation_IS_vars())
1072 1187
     }
1073
-    mt <- function(data, id_vars, sample_col, value_col) {
1188
+    mt <- function(data, id_vars, sample_col, value_col, progress) {
1189
+        if (!is.null(progress)) {
1190
+            progress()
1191
+        }
1074 1192
         data.table::melt.data.table(data,
1075 1193
             id.vars = id_vars,
1076 1194
             variable.name = sample_col,
... ...
@@ -1085,27 +1203,10 @@
1085 1203
     }
1086 1204
     max_workers <- trunc(BiocParallel::snowWorkers() / 3)
1087 1205
     if (call_mode == "INTERNAL" || nrow(df) <= 500000 || max_workers <= 1) {
1088
-        tidy <- mt(df, id_vars, id_col_name, val_col_name)
1206
+        tidy <- mt(df, id_vars, id_col_name, val_col_name, NULL)
1089 1207
     } else if (nrow(df) > 500000 & max_workers > 1) {
1090 1208
         ## - Melt in parallel
1091
-        p <- if (.Platform$OS.type == "windows") {
1092
-            BiocParallel::SnowParam(
1093
-                workers = max_workers,
1094
-                tasks = trunc(nrow(df) / max_workers),
1095
-                progressbar = getOption("ISAnalytics.verbose", TRUE),
1096
-                exportglobals = TRUE,
1097
-                stop.on.error = TRUE
1098
-            )
1099
-        } else {
1100
-            BiocParallel::MulticoreParam(
1101
-                workers = max_workers,
1102
-                tasks = trunc(nrow(df) / max_workers),
1103
-                progressbar = getOption("ISAnalytics.verbose", TRUE),
1104
-                exportglobals = FALSE,
1105
-                stop.on.error = TRUE
1106
-            )
1107
-        }
1108
-        ## - Split in chunks
1209
+        ### - Split in chunks
1109 1210
         chunk_id_vec <- rep(seq_len(max_workers - 1),
1110 1211
             each = trunc(nrow(df) / max_workers)
1111 1212
         )
... ...
@@ -1121,16 +1222,19 @@
1121 1222
             verbose = FALSE,
1122 1223
             keep.by = FALSE
1123 1224
         )
1124
-        tidy_chunks <- BiocParallel::bplapply(
1125
-            X = chunks,
1126
-            FUN = mt,
1127
-            id_vars = id_vars,
1128
-            sample_col = id_col_name,
1129
-            value_col = val_col_name,
1130
-            BPPARAM = p
1225
+        tidy_chunks <- .execute_map_job(
1226
+            data_list = chunks,
1227
+            fun_to_apply = mt,
1228
+            fun_args = list(
1229
+                id_vars = id_vars,
1230
+                sample_col = id_col_name,
1231
+                value_col = val_col_name
1232
+            ),
1233
+            stop_on_error = TRUE,
1234
+            max_workers = max_workers
1131 1235
         )
1132
-        BiocParallel::bpstop(p)
1133
-        tidy <- data.table::rbindlist(tidy_chunks)
1236
+
1237
+        tidy <- data.table::rbindlist(tidy_chunks$res)
1134 1238
     }
1135 1239
     tidy <- tidy[val_col_name > 0]
1136 1240
     ## Transform cols
... ...
@@ -1729,22 +1833,8 @@
1729 1833
     vispa_stats_req <- iss_stats_specs(TRUE) %>%
1730 1834
         dplyr::filter(.data$flag == "required") %>%
1731 1835
         dplyr::pull(.data$names)
1732
-    # Setup parallel workers and import
1733
-    # Register backend according to platform
1734
-    if (.Platform$OS.type == "windows") {
1735
-        p <- BiocParallel::SnowParam(
1736
-            stop.on.error = FALSE,
1737
-            tasks = length(stats_paths$stats_files),
1738
-            progressbar = getOption("ISAnalytics.verbose", TRUE)
1739
-        )
1740
-    } else {
1741
-        p <- BiocParallel::MulticoreParam(
1742
-            stop.on.error = FALSE,
1743
-            tasks = length(stats_paths$stats_files),
1744
-            progressbar = getOption("ISAnalytics.verbose", TRUE)
1745
-        )
1746
-    }
1747
-    FUN <- function(x, req_cols) {
1836
+
1837
+    FUN <- function(x, req_cols, progress) {
1748 1838
         if (is.na(x)) {
1749 1839
             return(NULL)
1750 1840
         }
... ...
@@ -1757,20 +1847,26 @@
1757 1847
         if (ok == TRUE) {
1758 1848
             # Apply column transformations if present
1759 1849
             stats <- .apply_col_transform(stats, iss_stats_specs(TRUE))
1850
+            if (!is.null(progress)) {
1851
+                progress()
1852
+            }
1760 1853
             return(stats)
1761 1854
         } else {
1855
+            if (!is.null(progress)) {
1856
+                progress()
1857
+            }
1762 1858
             return("MALFORMED")
1763 1859
         }
1764 1860
     }
1765
-    stats_dfs <- BiocParallel::bptry(
1766
-        BiocParallel::bplapply(stats_paths$stats_files,
1767
-            FUN,
1768
-            req_cols = vispa_stats_req,
1769
-            BPPARAM = p
1770
-        )
1861
+    stats_dfs <- .execute_map_job(
1862
+        data_list = stats_paths$stats_files,
1863
+        fun_to_apply = FUN,
1864
+        fun_args = list(
1865
+            req_cols = vispa_stats_req
1866
+        ), stop_on_error = FALSE
1771 1867
     )
1772
-    BiocParallel::bpstop(p)
1773
-    correct <- purrr::map_chr(stats_dfs, function(x) {
1868
+
1869
+    correct <- purrr::map_chr(stats_dfs$res, function(x) {
1774 1870
         if (all(is.data.frame(x))) {
1775 1871
             "TRUE"
1776 1872
         } else if (is.null(x)) {
... ...
@@ -1800,7 +1896,7 @@
1800 1896
     })
1801 1897
     stats_paths <- stats_paths %>%
1802 1898
         dplyr::select(-.data$info)
1803
-    stats_dfs <- stats_dfs[stats_paths$Imported]
1899
+    stats_dfs <- stats_dfs$res[stats_paths$Imported]
1804 1900
     # Bind rows in single tibble for all files
1805 1901
     if (purrr::is_empty(stats_dfs)) {
1806 1902
         return(list(stats = NULL, report = stats_paths))
... ...
@@ -2858,12 +2954,12 @@
2858 2954
 # @param workers Number of parallel workers
2859 2955
 # @keywords internal
2860 2956
 #' @importFrom dplyr filter mutate bind_rows distinct
2861
-#' @importFrom BiocParallel SnowParam MulticoreParam bptry bplapply bpstop bpok
2862 2957
 #' @importFrom purrr is_empty reduce
2863 2958
 #
2864 2959
 # @return A single tibble with all data from matrices of same quantification
2865 2960
 # type in tidy format
2866
-.import_type <- function(q_type, files, cluster, prog, import_matrix_args) {
2961
+.import_type <- function(q_type, files, cluster,
2962
+    progress, import_matrix_args) {
2867 2963
     files <- files %>%
2868 2964
         dplyr::filter(.data$Quantification_type == q_type)
2869 2965
     sample_col_name <- if ("id_col_name" %in% names(import_matrix_args)) {
... ...
@@ -2876,22 +2972,39 @@
2876 2972
             list(path = x),
2877 2973
             arg_list
2878 2974
         ))
2879
-        if (!is.null(prog)) {
2880
-          prog()
2975
+        if (!is.null(progress)) {
2976
+            progress()
2881 2977
         }
2882 2978
         return(matrix)
2883 2979
     }
2884
-    # Import every file
2885
-    suppressWarnings({
2886
-      matrices <- BiocParallel::bptry(
2887
-          BiocParallel::bplapply(files$Files_chosen,
2888
-              FUN = FUN,
2889
-              arg_list = import_matrix_args,
2890
-              BPPARAM = cluster
2891
-          )
2892
-      )
2893
-    })
2894
-    correct <- BiocParallel::bpok(matrices)
2980
+    if (!is.null(cluster)) {
2981
+        # Import every file
2982
+        suppressWarnings({
2983
+            matrices <- BiocParallel::bptry(
2984
+                BiocParallel::bplapply(files$Files_chosen,
2985
+                    FUN = FUN,
2986
+                    arg_list = import_matrix_args,
2987
+                    BPPARAM = cluster
2988
+                )
2989
+            )
2990
+        })
2991
+        correct <- BiocParallel::bpok(matrices)
2992
+    } else {
2993
+        arg_list <- append(
2994
+            import_matrix_args,
2995
+            list(
2996
+                .x = files$Files_chosen,
2997
+                .f = FUN
2998
+            )
2999
+        )
3000
+        matrices <- rlang::exec(
3001
+            purrr::safely(purrr::map),
3002
+            !!!arg_list
3003
+        )
3004
+        correct <- purrr::map_lgl(matrices$error, ~ is.null(.x))
3005
+        matrices <- matrices$result
3006
+    }
3007
+
2895 3008
     samples_count <- purrr::map2_int(matrices, correct, ~ {
2896 3009
         if (.y) {
2897 3010
             .x %>%
... ...
@@ -2944,27 +3057,52 @@
2944 3057
     q_types <- files_to_import %>%
2945 3058
         dplyr::distinct(.data$Quantification_type) %>%
2946 3059
         dplyr::pull(.data$Quantification_type)
2947
-
2948
-    # Register backend + progressor
2949
-    doParallel::registerDoParallel(workers)
2950
-    p <- BiocParallel::DoparParam(stop.on.error = FALSE)
2951
-    prog <- if (rlang::is_installed("progressr")) {
2952
-      progressr::progressor(steps = nrow(files_to_import))
2953
-    } else {
2954
-      NULL
2955
-    }
2956
-    doParallel::stopImplicitCluster()
2957
-    # Import and merge for every quantification type
2958
-    imported_matrices <- purrr::map(q_types,
2959
-        .f = ~ .import_type(
2960
-            .x,
2961
-            files_to_import,
2962
-            p,
2963
-            prog,
2964
-            import_matrix_args
3060
+    .check_parallel_packages()
3061
+    if (getOption("ISAnalytics.parallel_processing", TRUE) == TRUE) {
3062
+        # Register backend + progressor
3063
+        old_be <- doFuture::registerDoFuture()
3064
+        old_plan <- future::plan(future::multisession, workers = workers)
3065
+        on.exit(
3066
+            {
3067
+                future::plan(old_plan)
3068
+                foreach::setDoPar(fun = old_be$fun,
3069
+                                  data = old_be$data, info = old_be$info)
3070
+            },
3071
+            add = TRUE
2965 3072
         )
2966
-    ) %>%
2967
-        purrr::set_names(q_types)
3073
+        p <- BiocParallel::DoparParam(stop.on.error = FALSE)
3074
+        prog <- if (rlang::is_installed("progressr")) {
3075
+            progressr::progressor(steps = nrow(files_to_import))
3076
+        } else {
3077
+            NULL
3078
+        }
3079
+        # Import and merge for every quantification type
3080
+        imported_matrices <- purrr::map(q_types,
3081
+            .f = ~ .import_type(
3082
+                .x,
3083
+                files_to_import,
3084
+                p,
3085
+                prog,
3086
+                import_matrix_args
3087
+            )
3088
+        ) %>%
3089
+            purrr::set_names(q_types)
3090
+    } else {
3091
+        p <- NULL
3092
+        # Import and merge for every quantification type
3093
+        imported_matrices <- purrr::map(q_types,
3094
+            .f = ~ .import_type(
3095
+                .x,
3096
+                files_to_import,
3097
+                p,
3098
+                prog,
3099
+                import_matrix_args
3100
+            )
3101
+        ) %>%
3102
+            purrr::set_names(q_types)
3103
+    }
3104
+
3105
+
2968 3106
     summary_files <- purrr::map(imported_matrices, ~ .x$imported_files) %>%
2969 3107
         purrr::reduce(dplyr::bind_rows)
2970 3108
     imported_matrices <- purrr::map(imported_matrices, ~ .x$matrix)
... ...
@@ -3153,16 +3291,17 @@
3153 3291
 # @return A named list containing the splitted joined_df for collisions and
3154 3292
 # non-collisions
3155 3293
 .identify_independent_samples <- function(joined, indep_sample_id) {
3156
-  data.table::setDT(joined)
3157
-  data.table::setkeyv(joined, cols = mandatory_IS_vars())
3158
-  vars_to_group <- c(mandatory_IS_vars(), indep_sample_id)
3159
-  joined_red <- joined[, mget(vars_to_group)]
3160
-  temp <- joined_red[, list(N = data.table::uniqueN(.SD)),
3161
-                     keyby = eval(mandatory_IS_vars())]
3162
-  temp <- temp[eval(rlang::sym("N")) > 1, ]
3163
-  non_collisions <- joined[!temp, on = eval(mandatory_IS_vars())]
3164
-  collisions <- dplyr::semi_join(joined, temp, by = mandatory_IS_vars())
3165
-  list(collisions = collisions, non_collisions = non_collisions)
3294
+    data.table::setDT(joined)
3295
+    data.table::setkeyv(joined, cols = mandatory_IS_vars())
3296
+    vars_to_group <- c(mandatory_IS_vars(), indep_sample_id)
3297
+    joined_red <- joined[, mget(vars_to_group)]
3298
+    temp <- joined_red[, list(N = data.table::uniqueN(.SD)),
3299
+        keyby = eval(mandatory_IS_vars())
3300
+    ]
3301
+    temp <- temp[eval(rlang::sym("N")) > 1, ]
3302
+    non_collisions <- joined[!temp, on = eval(mandatory_IS_vars())]
3303
+    collisions <- dplyr::semi_join(joined, temp, by = mandatory_IS_vars())
3304
+    list(collisions = collisions, non_collisions = non_collisions)
3166 3305
 }
3167 3306
 
3168 3307
 # Internal for date discrimination in collision removal.
... ...
@@ -3303,22 +3442,29 @@
3303 3442
     repl_col,
3304 3443
     reads_ratio,
3305 3444
     seqCount_col,
3306
-    ind_sample_key) {
3445
+    ind_sample_key,
3446
+    progress = NULL) {
3307 3447
     current_data <- x[, !mandatory_IS_vars()]
3308 3448
     # Try to discriminate by date
3309 3449
     result <- .discriminate_by_date(current_data, date_col, ind_sample_key)
3310 3450
     if (result$check == TRUE) {
3311
-      current_data <- result$data
3451
+        current_data <- result$data
3312 3452
         coordinates <- x[seq_len(nrow(current_data)), mget(mandatory_IS_vars())]
3313 3453
         res <- cbind(coordinates, current_data)
3454
+        if (!is.null(progress)) {
3455
+            progress()
3456
+        }
3314 3457
         return(list(data = res, reassigned = 1, removed = 0))
3315 3458
     }
3316 3459
     # If first check fails try to discriminate by replicate
3317 3460
     result <- .discriminate_by_replicate(current_data, repl_col, ind_sample_key)
3318 3461
     if (result$check == TRUE) {
3319
-      current_data <- result$data
3462
+        current_data <- result$data
3320 3463
         coordinates <- x[seq_len(nrow(current_data)), mget(mandatory_IS_vars())]
3321 3464
         res <- cbind(coordinates, current_data)
3465
+        if (!is.null(progress)) {
3466
+            progress()
3467
+        }
3322 3468
         return(list(data = res, reassigned = 1, removed = 0))
3323 3469
     }
3324 3470
     # If second check fails try to discriminate by seqCount
... ...
@@ -3327,12 +3473,18 @@
3327 3473
         ind_sample_key
3328 3474
     )
3329 3475
     if (result$check == TRUE) {
3330
-      current_data <- result$data
3476
+        current_data <- result$data
3331 3477
         coordinates <- x[seq_len(nrow(current_data)), mget(mandatory_IS_vars())]
3332 3478
         res <- cbind(coordinates, current_data)
3479
+        if (!is.null(progress)) {
3480
+            progress()
3481
+        }
3333 3482
         return(list(data = res, reassigned = 1, removed = 0))
3334 3483
     }
3335 3484
     # If all check fails remove the integration from all subjects
3485
+    if (!is.null(progress)) {
3486
+        progress()
3487
+    }
3336 3488
     return(list(data = NULL, reassigned = 0, removed = 1))
3337 3489
 }
3338 3490
 
... ...
@@ -3351,48 +3503,28 @@
3351 3503
     max_workers) {
3352 3504
     # Split by IS
3353 3505
     split_data <- collisions %>%
3354
-      dplyr::group_by(dplyr::across(dplyr::all_of(mandatory_IS_vars()))) %>%
3355
-      dplyr::group_split() %>%
3356
-      purrr::map(.f = data.table::setDT)
3357
-    # Manage workers
3358
-    if (.Platform$OS.type == "windows" & is.null(max_workers)) {
3359
-        max_workers <- BiocParallel::snowWorkers()
3360
-    } else if (.Platform$OS.type != "windows" & is.null(max_workers)) {
3361
-        max_workers <- BiocParallel::multicoreWorkers()
3362
-    }
3363
-    # Register backend according to platform
3364
-    if (.Platform$OS.type == "windows") {
3365
-        p <- BiocParallel::SnowParam(
3366
-            stop.on.error = TRUE,
3367
-            progressbar = getOption("ISAnalytics.verbose", TRUE),
3368
-            tasks = length(split_data),
3369
-            workers = max_workers
3370
-        )
3371
-    } else {
3372
-        p <- BiocParallel::MulticoreParam(
3373
-            stop.on.error = TRUE,
3374
-            progressbar = getOption("ISAnalytics.verbose", TRUE),
3375
-            tasks = length(split_data),
3376
-            workers = max_workers
3377
-        )
3378
-    }
3379
-    # For each chunk process collisions in parallel
3380
-    result <- BiocParallel::bplapply(split_data,
3381
-        FUN = .four_step_check,
3382
-        date_col = date_col,
3383
-        repl_col = repl_col,
3384
-        reads_ratio = reads_ratio,
3385
-        seqCount_col = seqCount_col,
3386
-        ind_sample_key = ind_sample_key,
3387
-        BPPARAM = p
3506
+        dplyr::group_by(dplyr::across(dplyr::all_of(mandatory_IS_vars()))) %>%
3507
+        dplyr::group_split() %>%
3508
+        purrr::map(.f = data.table::setDT)
3509
+    result <- .execute_map_job(
3510
+        data_list = split_data,
3511
+        fun_to_apply = .four_step_check,
3512
+        fun_args = list(
3513
+            date_col = date_col,
3514
+            repl_col = repl_col,
3515
+            reads_ratio = reads_ratio,
3516
+            seqCount_col = seqCount_col,
3517
+            ind_sample_key = ind_sample_key
3518
+        ),
3519
+        stop_on_error = TRUE,
3520
+        max_workers = max_workers
3388 3521
     )
3389
-    BiocParallel::bpstop(p)
3390 3522
     # For each element of result extract and bind the 3 components
3391
-    processed_collisions_df <- purrr::map(result, ~ .x$data) %>%
3523
+    processed_collisions_df <- purrr::map(result$res, ~ .x$data) %>%
3392 3524
         purrr::reduce(~ data.table::rbindlist(list(.x, .y)))
3393
-    removed_total <- purrr::map(result, ~ .x$removed) %>%
3525
+    removed_total <- purrr::map(result$res, ~ .x$removed) %>%
3394 3526
         purrr::reduce(sum)
3395
-    reassigned_total <- purrr::map(result, ~ .x$reassigned) %>%
3527
+    reassigned_total <- purrr::map(result$res, ~ .x$reassigned) %>%
3396 3528
         purrr::reduce(sum)
3397 3529
     # Return the summary
3398 3530
     list(
... ...
@@ -3410,42 +3542,42 @@
3410 3542
 #' @importFrom rlang .data
3411 3543
 # @keywords internal
3412 3544
 #
3413
-# @return A tibble with a summary containing for each SubjectID the number of
3545
+# @return A tibble with a summary containing for each indep sample the number of
3414 3546
 # integrations found before and after, the sum of the value of the sequence
3415
-# count for each subject before and after and the corresponding deltas.
3416
-.summary_table <- function(before, after, seqCount_col) {
3547
+# count for each sample before and after and the corresponding deltas.
3548
+.summary_table <- function(before, after, seqCount_col, ind_sample_id) {
3417 3549
     after_matr <- after %>%
3418
-        dplyr::select(dplyr::all_of(colnames(after)), .data$SubjectID)
3550
+        dplyr::select(dplyr::all_of(c(colnames(after), ind_sample_id)))
3419 3551
 
3420 3552
     n_int_after <- after_matr %>%
3421
-        dplyr::group_by(.data$SubjectID) %>%
3553
+        dplyr::group_by(dplyr::across(dplyr::all_of(ind_sample_id))) %>%
3422 3554
         dplyr::tally(name = "count_integrations_after")
3423 3555
     sumReads_after <- after_matr %>%
3424
-        dplyr::group_by(.data$SubjectID) %>%
3556
+        dplyr::group_by(dplyr::across(dplyr::all_of(ind_sample_id))) %>%
3425 3557
         dplyr::summarise(
3426 3558
             sumSeqReads_after = sum(.data[[seqCount_col]]),
3427 3559
             .groups = "drop_last"
3428 3560
         )
3429 3561
     temp_aft <- n_int_after %>% dplyr::left_join(sumReads_after,
3430
-        by = c("SubjectID")
3562
+        by = ind_sample_id
3431 3563
     )
3432 3564
 
3433 3565
     before_matr <- before %>%
3434
-        dplyr::select(dplyr::all_of(colnames(after)), .data$SubjectID)
3566
+        dplyr::select(dplyr::all_of(c(colnames(after), ind_sample_id)))
3435 3567
     n_int_before <- before_matr %>%
3436
-        dplyr::group_by(.data$SubjectID) %>%
3568
+        dplyr::group_by(dplyr::across(dplyr::all_of(ind_sample_id))) %>%
3437 3569
         dplyr::tally(name = "count_integrations_before")
3438 3570
     sumReads_before <- before_matr %>%
3439
-        dplyr::group_by(.data$SubjectID) %>%
3571
+        dplyr::group_by(dplyr::across(dplyr::all_of(ind_sample_id))) %>%
3440 3572
         dplyr::summarise(
3441 3573
             sumSeqReads_before = sum(.data[[seqCount_col]]),
3442 3574
             .groups = "drop_last"
3443 3575
         )
3444 3576
     temp_bef <- n_int_before %>% dplyr::left_join(sumReads_before,
3445
-        by = c("SubjectID")
3577
+        by = ind_sample_id
3446 3578
     )
3447 3579
     summary <- temp_bef %>%
3448
-        dplyr::left_join(temp_aft, by = c("SubjectID")) %>%
3580
+        dplyr::left_join(temp_aft, by = ind_sample_id) %>%
3449 3581
         dplyr::mutate(
3450 3582
             delta_integrations =
3451 3583
                 .data$count_integrations_before -
... ...
@@ -3561,7 +3693,8 @@
3561 3693
     )
3562 3694
     summary_tbl <- .summary_table(
3563 3695
         before = joined, after = post_joined,
3564
-        seqCount_col = seq_count_col
3696
+        seqCount_col = seq_count_col,
3697
+        ind_sample_id = independent_sample_id
3565 3698
     )
3566 3699
     return(
3567 3700
         list(
... ...
@@ -3842,7 +3975,8 @@
3842 3975
     sample_col,
3843 3976
     req_tags,
3844 3977
     add_col_lambdas,
3845
-    produce_map) {
3978
+    produce_map,
3979
+    progress = NULL) {
3846 3980
     locus_col <- req_tags %>%
3847 3981
         dplyr::filter(.data$tag == "locus") %>%
3848 3982
         dplyr::pull(.data$names)
... ...
@@ -3888,6 +4022,9 @@
3888 4022
                         )
3889 4023
                 ]
3890 4024
             }
4025
+            if (!is.null(progress)) {
4026
+                progress()
4027
+            }
3891 4028
             return(list(recalibrated_matrix = x, map = map_recalibr))
3892 4029
         }
3893 4030
         ## Compute interval for row
... ...
@@ -4030,7 +4167,10 @@
4030 4167
             index <- index + 1
4031 4168
         }
4032 4169
     }
4033
-    list(recalibrated_matrix = x, map = map_recalibr)
4170
+    if (!is.null(progress)) {
4171
+        progress()
4172
+    }
4173
+    return(list(recalibrated_matrix = x, map = map_recalibr))
4034 4174
 }
4035 4175
 
4036 4176
 
... ...
@@ -4571,7 +4711,6 @@
4571 4711
     # Check other parameters
4572 4712
     stopifnot(is.data.frame(genomic_annotation_file) ||
4573 4713
         is.character(genomic_annotation_file))
4574
-    genomic_annotation_file <- genomic_annotation_file[1]
4575 4714
     if (is.character(genomic_annotation_file) &&
4576 4715
         !genomic_annotation_file %in% c("hg19", "mm9")) {
4577 4716
         err_msg <- c("Genomic annotation file unknown",
... ...
@@ -5424,6 +5563,7 @@
5424 5563
     } else {
5425 5564
         c(seqCount_column, fragmentEstimate_column)
5426 5565
     }
5566
+    logger <- list()
5427 5567
     # --- STABLE TIMEPOINTS?
5428 5568
     found_stable <- purrr::map_lgl(
5429 5569
         stable_timepoints,
... ...
@@ -5498,6 +5638,11 @@
5498 5638
         cols_estimate_mcm = cols_estimate_mcm,
5499 5639
         subject = subject, stable = FALSE
5500 5640
     )
5641
+    if (is.null(models0_all) || nrow(models0_all) == 0) {
5642
+        logger <- append(logger, list(
5643
+            paste("Patient", subject, "- Models0 estimates tp 'All' is empty")
5644
+        ))
5645
+    }
5501 5646
     #### For the slice (first stable - last tp)
5502 5647
     models0_stable <- if (!is.null(patient_slice_stable) &&
5503 5648
         ncol(patient_slice_stable) > 1) {
... ...
@@ -5510,6 +5655,11 @@
5510 5655
     } else {
5511 5656
         NULL
5512 5657
     }
5658
+    if (is.null(models0_stable) || nrow(models0_stable) == 0) {
5659
+        logger <- append(logger, list(
5660
+            paste("Patient", subject, "- Models0 estimates tp 'Stable' is empty")
5661
+        ))
5662
+    }
5513 5663
     ## BC models
5514 5664
     ### Estimate abundance without bias correction nor het
5515 5665
     models_bc_all <- .closed_mthchaobc_est(
... ...
@@ -5518,6 +5668,11 @@
5518 5668
         cols_estimate_mcm = cols_estimate_mcm,
5519 5669
         subject = subject, stable = FALSE
5520 5670
     )
5671
+    if (is.null(models_bc_all) || nrow(models_bc_all) == 0) {
5672
+        logger <- append(logger, list(
5673
+            paste("Patient", subject, "- ModelsBC estimates tp 'All' is empty")
5674
+        ))
5675
+    }
5521 5676
     models_bc_stable <- if (!is.null(patient_slice_stable) &&
5522 5677
         ncol(patient_slice_stable) > 1) {
5523 5678
         .closed_mthchaobc_est(
... ...
@@ -5529,6 +5684,11 @@
5529 5684
     } else {
5530 5685
         NULL
5531 5686
     }
5687
+    if (is.null(models_bc_stable) || nrow(models_bc_stable) == 0) {
5688
+        logger <- append(logger, list(
5689
+            paste("Patient", subject, "- ModelsBC estimates tp 'Stable' is empty")
5690
+        ))
5691
+    }
5532 5692
     ## Consecutive timepoints
5533 5693
     ### Model m0 bc
5534 5694
     estimate_consecutive_m0 <- if (ncol(matrix_desc) > 1) {
... ...
@@ -5540,6 +5700,15 @@
5540 5700
     } else {
5541 5701
         NULL
5542 5702
     }
5703
+    if (is.null(estimate_consecutive_m0) ||
5704
+        nrow(estimate_consecutive_m0) == 0) {
5705
+        logger <- append(logger, list(
5706
+            paste(
5707
+                "Patient", subject,
5708
+                "- Models0 estimates tp 'Consecutive' is empty"
5709
+            )
5710
+        ))
5711
+    }
5543 5712
     ### Model Mth
5544 5713
     estimate_consecutive_mth <- if (stable_tps & ncol(matrix_desc) > 2) {
5545 5714
         # - Note: pass the whole matrix, not only stable slice
... ...
@@ -5551,13 +5720,22 @@
5551 5720
     } else {
5552 5721
         NULL
5553 5722
     }
5723
+    if (is.null(estimate_consecutive_mth) ||
5724
+        nrow(estimate_consecutive_mth) == 0) {
5725
+        logger <- append(logger, list(
5726
+            paste(
5727
+                "Patient", subject,
5728
+                "- ModelsMth estimates tp 'Consecutive' is empty"
5729
+            )
5730
+        ))
5731
+    }
5554 5732
     results <- models0_all %>%
5555 5733
         dplyr::bind_rows(models0_stable) %>%
5556 5734
         dplyr::bind_rows(models_bc_all) %>%
5557 5735
         dplyr::bind_rows(models_bc_stable) %>%
5558 5736
         dplyr::bind_rows(estimate_consecutive_m0) %>%
5559 5737
         dplyr::bind_rows(estimate_consecutive_mth)
5560
-    results
5738
+    return(list(est = results, logger = logger))
5561 5739
 }
5562 5740
 
5563 5741
 #' @importFrom Rcapture closedp.0
... ...
@@ -5756,13 +5934,15 @@
5756 5934
     tissue_type,
5757 5935
     annotation_cols,
5758 5936
     subj_col,
5759
-    stable_timepoints) {
5937
+    stable_timepoints,
5938
+    progress = NULL) {
5760 5939
     # --- RE-AGGREGATION - by cell type, tissue and time point
5761 5940
     val_cols <- if (!is.null(fragmentEstimate_column)) {
5762 5941
         c(seqCount_column, fragmentEstimate_column)
5763 5942
     } else {
5764 5943
         seqCount_column
5765 5944
     }
5945
+    logger <- NULL
5766 5946
     re_agg <- aggregate_values_by_key(
5767 5947
         x = df,
5768 5948
         association_file = metadata,
... ...
@@ -5827,7 +6007,14 @@
5827 6007
     ### If selected patient does not have at least 3 distinct timepoints
5828 6008
     ### simply return (do not consider for population estimate)
5829 6009
     if (length(unique(patient_slice[[timepoint_column]])) <= 2) {
5830
-        return(NULL)
6010
+        logger <- list(
6011
+            paste(
6012
+                "Patient", df[[subj_col]][1],
6013
+                "- Data contains less than 2 time points,",
6014
+                "returning NULL"
6015
+            )
6016
+        )
6017
+        return(list(est = NULL, log = logger))
5831 6018
     }
5832 6019
     estimate <- .estimate_pop(
5833 6020
         df = patient_slice,
... ...
@@ -5839,7 +6026,10 @@
5839 6026
         stable_timepoints = stable_timepoints,
5840 6027
         tissue_col = tissue_col
5841 6028
     )
5842
-    return(estimate)
6029
+    if (!is.null(progress)) {
6030
+        progress()
6031
+    }
6032
+    return(list(est = estimate$est, log = estimate$logger))
5843 6033
 }
5844 6034
 
5845 6035
 #---- USED IN : is_sharing ----
... ...
@@ -6496,22 +6686,10 @@
6496 6686
                 )
6497 6687
             )
6498 6688
         }
6499
-        if (.Platform$OS.type == "windows") {
6500
-            p <- BiocParallel::SnowParam(
6501
-                tasks = length(common_names),
6502
-                progressbar = getOption("ISAnalytics.verbose", TRUE),
6503
-                exportglobals = TRUE
6504
-            )
6505
-        } else {
6506
-            p <- BiocParallel::MulticoreParam(
6507
-                tasks = length(common_names),
6508
-                progressbar = getOption("ISAnalytics.verbose", TRUE),
6509
-                exportglobals = FALSE
6510
-            )
6511
-        }
6689
+
6512 6690
         FUN <- function(subj,
6513 6691
     ref, sel, ref_key, sel_key,
6514
-    tp_col) {
6692
+    tp_col, progress) {
6515 6693
             ref_df <- ref[[subj]]
6516 6694
             sel_df <- sel[[subj]]
6517 6695
             ref_key_min <- ref_key[ref_key != tp_col]
... ...
@@ -6545,16 +6723,24 @@
6545 6723
                         convert = TRUE
6546 6724
                     )
6547 6725
             })
6548
-        }
6549
-
6550
-        shared <- BiocParallel::bplapply(common_names, FUN,
6551
-            ref = ref,
6552
-            sel = sel,
6553
-            ref_key = ref_key,
6554
-            sel_key = sel_key,
6555
-            tp_col = tp_col,
6556
-            BPPARAM = p
6557
-        ) %>%
6726
+            if (!is.null(progress)) {
6727
+                progress()
6728
+            }
6729
+            sharing
6730
+        }
6731
+
6732
+        shared <- .execute_map_job(
6733
+            data_list = common_names,
6734
+            fun_to_apply = FUN,
6735
+            fun_args = list(
6736
+                ref = ref,
6737
+                sel = sel,
6738
+                ref_key = ref_key,
6739
+                sel_key = sel_key,
6740
+                tp_col = tp_col
6741
+            ),
6742
+            stop_on_error = TRUE
6743
+        )$res %>%
6558 6744
             purrr::set_names(common_names)
6559 6745
         return(shared)
6560 6746
     }
... ...
@@ -406,13 +406,44 @@
406 406
 # Error message displayed for suggestion packages that are not installed
407 407
 # but required by the called function
408 408
 .missing_pkg_error <- function(pkg, lib = "CRAN") {
409
-    inst_sugg <- if (lib == "CRAN") {
410
-        paste0('To install: `install.packages("', pkg, '")`')
411
-    } else if (lib == "BIOC") {
412
-        paste0('To install: `BiocManager::install("', pkg, '")`')
409
+    if (!is.null(names(pkg))) {
410
+        pkgs_str <- paste0('"', names(pkg), '"', collapse = ", ")
411
+        from_cran <- pkg[pkg == "CRAN"]
412
+        from_cran <- if (length(from_cran) > 0) {
413
+            paste0(
414
+                "`install.packages(",
415
+                paste0('"', names(from_cran), '"', collapse = ","),
416
+                ")`"
417
+            )
418
+        } else {
419
+            NULL
420
+        }
421
+
422
+        from_bioc <- pkg[pkg == "BIOC"]
423
+        from_bioc <- if (length(from_bioc) > 0) {
424
+            paste0(
425
+                "`BiocManager::install(",
426
+                paste0('"', names(from_bioc), '"', collapse = ","),
427
+                ")`"
428
+            )
429
+        } else {
430
+            NULL
431
+        }
432
+        inst_sugg <- paste0("To install: ", from_cran, ", ", from_bioc)
433
+    } else {
434
+        pkgs_str <- paste0('"', pkg, '"', collapse = ", ")
435
+        inst_sugg <- if (lib == "CRAN") {
436
+            paste0("To install: `install.packages(", pkgs_str, ")`")
437
+        } else if (lib == "BIOC") {
438
+            paste0("To install: `BiocManager::install(", pkgs_str, ")`")
439
+        }
413 440
     }
414
-    c("Missing package",
415
-        x = paste0("Package '", pkg, "' is required for this functionality."),
441
+
442
+    c("Missing package(s)",
443
+        x = paste0(
444
+            "Package(s) ", pkgs_str,
445
+            " are required for this functionality."
446
+        ),
416 447
         i = inst_sugg
417 448
     )
418 449
 }
... ...
@@ -984,7 +984,7 @@ fisher_scatterplot <- function(fisher_df,
984 984
 #'     stable_timepoints = c(90, 180, 360),
985 985
 #'     cell_type = "Other"
986 986
 #' )
987
-#' p <- HSC_population_plot(estimate, "PJ01")
987
+#' p <- HSC_population_plot(estimate$est, "PJ01")
988 988
 #' p
989 989
 HSC_population_plot <- function(estimates,
990 990
     project_name,
... ...
@@ -1108,7 +1108,6 @@ HSC_population_plot <- function(estimates,
1108 1108
 #' @family Plotting functions
1109 1109
 #' @importFrom rlang abort eval_tidy call2 inform .data fn_fmls_names dots_list
1110 1110
 #' @importFrom dplyr group_by across group_keys everything pull group_split
1111
-#' @importFrom BiocParallel SnowParam MulticoreParam bplapply bpstop
1112 1111
 #' @importFrom purrr set_names
1113 1112
 #' @importFrom tidyr unite
1114 1113
 #'
... ...
@@ -1177,13 +1176,7 @@ integration_alluvial_plot <- function(x,
1177 1176
         dplyr::group_split() %>%
1178 1177
         purrr::set_names(group_names)
1179 1178
 
1180
-    # Compute plots in parallel
1181
-    p <- BiocParallel::MulticoreParam(
1182
-        stop.on.error = FALSE, progressbar = TRUE,
1183
-        tasks = length(groups_to_plot), exportglobals = FALSE
1184
-    )
1185
-
1186
-
1179
+    # # Compute plots in parallel
1187 1180
     FUN <- function(group_df,
1188 1181
     plot_x,
1189 1182
     plot_y,
... ...
@@ -1191,7 +1184,8 @@ integration_alluvial_plot <- function(x,
1191 1184
     alluvia_plot_y_threshold,
1192 1185
     top_abundant_tbl,
1193 1186
     empty_space_color,
1194
-    other_params) {
1187
+    other_params,
1188
+    progress) {
1195 1189
         cleaned <- .clean_data(
1196 1190
             group_df, plot_x, plot_y,
1197 1191
             alluvia, alluvia_plot_y_threshold
... ...
@@ -1231,8 +1225,14 @@ integration_alluvial_plot <- function(x,
1231 1225
                     invokeRestart("missing")
1232 1226
                 }
1233 1227
             )
1228
+            if (!is.null(progress)) {
1229
+                progress()
1230
+            }
1234 1231
             return(list(plot = alluv_plot, tables = summary_tbls))
1235 1232
         }
1233
+        if (!is.null(progress)) {
1234
+            progress()
1235
+        }
1236 1236
         return(alluv_plot)
1237 1237
     }
1238 1238
 
... ...
@@ -1245,20 +1245,30 @@ integration_alluvial_plot <- function(x,
1245 1245
             "alluvial_plot"
1246 1246
         )]
1247 1247
     dot_args <- dot_args[names(dot_args) %in% optional_params_names]
1248
-    results <- BiocParallel::bplapply(
1249
-        X = groups_to_plot,
1250
-        FUN = FUN,
1251
-        plot_x = plot_x,
1252
-        plot_y = plot_y,
1253
-        alluvia = alluvia,
1254
-        alluvia_plot_y_threshold = alluvia_plot_y_threshold,
1255
-        top_abundant_tbl = top_abundant_tbl,
1256
-        empty_space_color = empty_space_color,
1257
-        other_params = dot_args,
1258
-        BPPARAM = p
1248
+    results <- .execute_map_job(
1249
+        data_list = groups_to_plot,
1250
+        fun_to_apply = FUN,
1251
+        fun_args = list(
1252
+            plot_x = plot_x,
1253
+            plot_y = plot_y,
1254
+            alluvia = alluvia,
1255
+            alluvia_plot_y_threshold = alluvia_plot_y_threshold,
1256
+            top_abundant_tbl = top_abundant_tbl,
1257
+            empty_space_color = empty_space_color,
1258
+            other_params = dot_args
1259
+        ),
1260
+        stop_on_error = FALSE
1259 1261
     )
1260
-    BiocParallel::bpstop(p)
1261
-    return(results)
1262
+    if (!all(is.null(results$err))) {
1263
+        errors_msg <- purrr::flatten_chr(results$err)
1264
+        names(errors_msg) <- "x"
1265
+        errors_msg <- c(
1266
+            "Errors occurred during computation",
1267
+            errors_msg
1268
+        )
1269
+        rlang::inform(errors_msg)
1270
+    }
1271
+    return(results$res)
1262 1272
 }
1263 1273
 
1264 1274
 # Internal, used in integration_alluvial_plot to obtain the data frame
... ...
@@ -1309,7 +1319,8 @@ integration_alluvial_plot <- function(x,
1309 1319
 #' @importFrom ggplot2 ggplot aes_ geom_text scale_fill_viridis_d sym
1310 1320
 #' @importFrom dplyr group_by summarise pull across all_of
1311 1321
 #' @importFrom rlang .data
1312
-.alluvial_plot <- function(tbl, plot_x, plot_y, empty_space_color) {
1322
+.alluvial_plot <- function(tbl, plot_x, plot_y,
1323
+    empty_space_color) {
1313 1324
     sums_y <- tbl %>%
1314 1325
         dplyr::group_by(dplyr::across(dplyr::all_of(plot_x))) %>%
1315 1326
         dplyr::summarise(sums = sum(.data[[plot_y]])) %>%
... ...
@@ -1715,14 +1726,13 @@ sharing_heatmap <- function(sharing_df,
1715 1726
             }
1716 1727
             plot
1717 1728
         }
1729
+        heatmap_rel <- purrr::map(
1730
+            unique(rel_sharing_col),
1731
+            ~ plot_rel_heat(.x, df = sharing_df_rounding)
1732
+        ) %>%
1733
+            purrr::set_names(rel_sharing_col)
1718 1734
     }
1719 1735
 
1720
-    heatmap_rel <- purrr::map(
1721
-        unique(rel_sharing_col),
1722
-        ~ plot_rel_heat(.x, df = sharing_df_rounding)
1723
-    ) %>%
1724
-        purrr::set_names(rel_sharing_col)
1725
-
1726 1736
     result <- list(absolute = heatmap_abs)
1727 1737
     result <- append(result, heatmap_rel)
1728 1738
     if (interactive) {
... ...
@@ -68,6 +68,7 @@
68 68
 #' the matching is case-insensitive.
69 69
 #' @param tissue_type The tissue types to include in the models. Note that
70 70
 #' the matching is case-insensitive.
71
+#' @param max_workers Maximum parallel workers allowed
71 72
 #'
72 73
 #' @section Required tags:
73 74
 #' The function will explicitly check for the presence of these tags:
... ...
@@ -116,7 +117,8 @@ HSC_population_size_estimate <- function(x,
116 117
     fragmentEstimate_threshold = 3,
117 118
     nIS_threshold = 5,
118 119
     cell_type = "MYELOID",
119
-    tissue_type = "PB") {
120
+    tissue_type = "PB",
121
+    max_workers = 4) {
120 122
     # Param check
121 123
     ## Basic checks on types
122 124
     stopifnot(is.data.frame(x))
... ...
@@ -139,6 +141,7 @@ HSC_population_size_estimate <- function(x,
139 141
     nIS_threshold <- nIS_threshold[1]
140 142
     stopifnot(is.character(cell_type))
141 143
     stopifnot(is.character(tissue_type))
144
+    stopifnot(is.numeric(max_workers))
142 145
     ## Convert cell_type and tissue_type to uppercase (for case insensitivity)
143 146
     cell_type <- stringr::str_to_upper(cell_type)
144 147
     tissue_type <- stringr::str_to_upper(tissue_type)
... ...
@@ -181,12 +184,19 @@ HSC_population_size_estimate <- function(x,
181 184
     }
182 185
     ## Check seqCount col in x
183 186
     if (!seqCount_column %in% colnames(x)) {
184
-        rlang::abort(c("Sequence count column not found in x"))
187
+        sc_err <- c("Sequence count column not found in x")
188
+        rlang::abort(sc_err)
185 189
     }
186 190
     ## Check fragmentEstimate col in x
187 191
     if (!is.null(fragmentEstimate_column) &&
188 192
         !fragmentEstimate_column %in% colnames(x)) {
189
-        rlang::abort(c("Fragment estimate column not found in x"))
193
+        fe_err <- c("Fragment estimate column not found in x",
194
+            i = paste(
195
+                "To ignore fragment estimate, set",
196
+                "`fragmentEstimate_column = NULL`"
197
+            )
198
+        )
199
+        rlang::abort(fe_err)
190 200
     }
191 201
     ## Reorder stable timepoints
192 202
     if (is.null(stable_timepoints)) {
... ...
@@ -196,7 +206,8 @@ HSC_population_size_estimate <- function(x,
196 206
     ## Check presence of NumIS column
197 207
     if (!"NumIS" %in% colnames(metadata)) {
198 208
         if (getOption("ISAnalytics.verbose", TRUE) == TRUE) {
199
-            rlang::inform(c("Calculating number of IS for each group..."))
209
+            is_msg <- c("Calculating number of IS for each group...")
210
+            rlang::inform(is_msg)
200 211
         }
201 212
         numIs <- x %>%
202 213
             dplyr::left_join(metadata, by = dplyr::all_of(aggregation_key)) %>%
... ...
@@ -231,6 +242,