7 min read

GdT-Pyr 7 - Parallelisation

https://forgemia.inra.fr/gdtpyr/gdt_pyr/-/tree/main/GDT_PyR7_parallelization

_ packages requis_

###########################
install.packages('pryr')    # outils pour comprendre le langage R a bas niveau
install.packages('microbenchmark') # profillage de code
install.packages('parallel') # parallélisme
install.packages('doParallel') # parallélisme
install.packages('lubridate') # gestion du format date
install.packages('httr')      # client HTTP
install.packages('foreach')   # facilite la création de boucle for parallelisées
install.packages('readr')     # lecture de fichiers
install.packages('future')    # objets promis (gestion asynchrone)

Exemples extraits de “Calcul Parallele avec R”, V.Miele & V.Louvet, edp sciences, 2016 (chargement des packages requis pour les exemples suivants)

library('pryr') 
## Error in library("pryr"): aucun package nommé 'pryr' n'est trouvé
library('microbenchmark')
## Error in library("microbenchmark"): aucun package nommé 'microbenchmark' n'est trouvé
library('parallel')
library('doParallel')
## Error in library("doParallel"): aucun package nommé 'doParallel' n'est trouvé

fonction exemple : erreur de prediction d’une regresseur par approche ‘leave one out’

leave.one.out = function(i)
{
    model = lm(Petal.Width ~ Petal.Length,data=my.iris[-i,])
    pred.petal.width = predict(model,data.frame(Petal.Length=my.iris[i,"Petal.Length"]))
    return((pred.petal.width-my.iris[i,"Petal.Width"])^2)
}

######################
# donnees de l'exemple : iris est automatiquement disponible pour tout processus R (mais pas my.iris)
my.iris = iris

######################
# vectorisation classique
microbenchmark(
    lapply(1:150, FUN = function(i) leave.one.out(i)),
    times=25
)

#########################
# nb Coeur total 
detectCores()
Unit: milliseconds
                                              expr      min       lq     mean   median       uq
 lapply(1:150, FUN = function(i) leave.one.out(i)) 169.5197 173.3894 176.8368 175.5258 179.1044
      max neval
 193.8009    25
[1] 8

APPROCHE PARALLELE ‘SNOW’ : SOCKET (multi-plateforme)

parLApply

test 1

cl = makeCluster(4, type="PSOCK")
# clusterExport(cl,list("my.iris"))
# clusterExport(cl,list("leave.one.out"))
#  clusterExport(cl,list("leave.one.out","my.iris"))
# 
microbenchmark(
    parLapply(cl,1:150,fun=function(i) leave.one.out(i)),
    times=25
)
## Error in microbenchmark(parLapply(cl, 1:150, fun = function(i) leave.one.out(i)), : impossible de trouver la fonction "microbenchmark"
stopCluster(cl)
### test 2
cl = makeCluster(4, type="PSOCK")
clusterExport(cl,list("leave.one.out"))
## Error in get(name, envir = envir): objet 'leave.one.out' introuvable
microbenchmark(
    parLapply(cl,1:150,fun=function(i) leave.one.out(i)),
    times=25
)
## Error in microbenchmark(parLapply(cl, 1:150, fun = function(i) leave.one.out(i)), : impossible de trouver la fonction "microbenchmark"
stopCluster(cl)

test 3

cl = makeCluster(4, type="PSOCK")
 clusterExport(cl,list("leave.one.out","my.iris"))

microbenchmark(
    parLapply(cl,1:150,fun=function(i) leave.one.out(i)),
    times=25
)
stopCluster(cl)
Unit: milliseconds
                                                     expr      min       lq     mean   median
 parLapply(cl, 1:150, fun = function(i) leave.one.out(i)) 47.06019 49.39344 57.31927 50.66847
       uq      max neval
 60.99022 112.9672    25

utilisation memoire

L= lapply(1:5e2,matrix,nrow=1e3,ncol=1e3)
L= lapply(1:6e2,matrix,nrow=2e3,ncol=1e3)

object_size(L)
mem_used()

# surveiller la memoire [top]
rm(L)
# surveiller la memoire [top]
gc()
# surveiller la memoire [top]

#comparaison des temps en parallel avec bcp de données et temps de traitement negligeable
L= lapply(1:6e2,matrix,nrow=2e3,ncol=1e3)

# sequentiel
system.time(lapply(L,FUN= function(e) {NULL}))

#parallele
cl = makeCluster(2, type="PSOCK")
system.time(parLapply(cl,L,fun= function(e) {NULL}))

# surveiller la memoire [top]
mem_used()
stopCluster(cl)
rm(L)
gc()
4.8 GB
4.86 GB
          used (Mb) gc trigger   (Mb)   max used   (Mb)
Ncells  814468 43.5    1442291   77.1    1442291   77.1
Vcells 1778650 13.6  824630579 6291.5 1030787594 7864.3
utilisateur     système      écoulé 
      0.002       0.000       0.002 
utilisateur     système      écoulé 
      4.865       1.062       9.055 
4.86 GB
          used (Mb) gc trigger   (Mb)   max used   (Mb)
Ncells  814442 43.5    1442291   77.1    1442291   77.1
Vcells 1778612 13.6  791709355 6040.3 1030787594 7864.3

APPROCHE PARALLELE MULTICORE (FORKING, LINUX)

mémoire

L= lapply(1:6e2,matrix,nrow=2e3,ncol=1e3)
cl = makeCluster(2, type="FORK")
system.time(parLapply(cl,L,fun= function(e) {NULL}))

mem_used()

stopCluster(cl)

# surveiller la memoire [top]
cl = makeCluster(2, type="FORK")
system.time(parLapply(cl,L,fun= function(e) {L[[1]][1,1]=0}))
# feinte top est piegé (meme zone memoire)  : c'est pas fiable...
stopCluster(cl)
utilisateur     système      écoulé 
      0.413       1.566       5.371 
4.86 GB
utilisateur     système      écoulé 
      0.440       1.805       7.236 

mclapply [parApply efficace en contexte multicore (forking)]

profite de la disponibilite automatique des donnees du processus pere syntaxe allegee

microbenchmark(
    1:150,fun=function(i) leave.one.out(i),mc.cores=4,
    times=50)
Unit: nanoseconds
     expr min  lq   mean median  uq  max neval
    1:150 297 398 535.20  452.5 594 3045    50
      fun  55  58 134.26   68.0  70 3134    50
 mc.cores   8  16  20.76   17.0  19  126    50

mcparallel & mccollect

foreach + Doparallel

Vectorisation classique Utilisation de foreach pour combiner les resultats en prenant la somme

microbenchmark(
    foreach(i=1:150, .combine=sum) %do%{ leave.one.out(i)},
    times=25
)
Unit: milliseconds
                                                             expr      min       lq     mean
 foreach(i = 1:150, .combine = sum) %do% {     leave.one.out(i) } 196.4287 204.2119 213.8368
   median       uq     max neval
 208.2454 220.8814 268.027    25

doParallel : avec combinison des sorties

registerDoParallel(cores=4)

microbenchmark(
    foreach(i=1:150, .combine=sum) %dopar% { leave.one.out(i)},
    times=25
)
Unit: milliseconds
                                                                expr      min       lq    mean
 foreach(i = 1:150, .combine = sum) %dopar% {     leave.one.out(i) } 324.8517 418.2593 419.838
   median       uq      max neval
 423.3756 427.1455 452.5507    25

Parallélisation de tâches

_ chargement des packages requis pour les exemples suivants_

library('lubridate') 
library('httr')   
library('foreach')
library('readr')
library('future')
library('jsonlite')

Nous allons aborder la notion de variables promises et de tâches asynchrones.

Exemple : le téléchargement de fichier

Préparation de téléchargement de fichiers de qualité de l’air

# entrepot de données pour la qualité de l'air
url <- "https://openaq-data.s3.amazonaws.com"

# nombre de jour
numberOfDays <- 5

# création du nombre de jour à récupérer pour le mois de mars 2018
# et des fichiers de sauvegardes
dates <- lubridate::ymd(paste0("2018", "-", "03", "-", seq(1, numberOfDays)))
savedfilePaths <- (paste0(tempdir(), "/2018", "-", "03", "-", seq(1, numberOfDays)))

print(savedfilePaths)
## [1] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-1"
## [2] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-2"
## [3] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-3"
## [4] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-4"
## [5] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-5"
print(dates)
## [1] "2018-03-01" "2018-03-02" "2018-03-03" "2018-03-04" "2018-03-05"
# nombre de repetition pour le profilage
benchmarkTimes = 5L

# recupère un csv d'une url et le sauvegarde sur le disque dur
##' @param filePath a string url
##' @param fileName fileName
##' @return 
getCSVFileFromUrl <- function(filePath, saveFilePath) {
  # recupération CSV d'un serveur web
  tmp <- httr::GET(filePath)
  # Stockage sur le disque dur dans les fichiers temporaires
  write(httr::content(tmp, 'text'), saveFilePath)
  return(saveFilePath)
}
[1] "/tmp/RtmpEvdyUS/2018-03-1" "/tmp/RtmpEvdyUS/2018-03-2" "/tmp/RtmpEvdyUS/2018-03-3"
[4] "/tmp/RtmpEvdyUS/2018-03-4" "/tmp/RtmpEvdyUS/2018-03-5"
[1] "2018-03-01" "2018-03-02" "2018-03-03" "2018-03-04" "2018-03-05"

récupération des fichiers (synchrone)

Téléchargement de fichiers normalement un par un

csvOnceAtTime <- function(){
  for (i in 1:length(dates)){
    fileName <- paste0(dates[i],".csv")
    filePath <- paste0(url,"/",fileName)
    csvSaveFilePath <- savedfilePaths[i]
    # sauvegarde des fichiers 
    getCSVFileFromUrl(filePath, csvSaveFilePath)
  }
}
benchmarkNonParallel1 <- microbenchmark::microbenchmark(csvOnceAtTime(),times = benchmarkTimes)
benchmarkNonParallel1

Débit : ~10 Mo/s

Unit: seconds
            expr      min       lq     mean   median       uq      max neval
 csvOnceAtTime() 43.98732 46.32592 46.54079 46.91352 47.11949 48.35772     5

récupération de fichiers synchrone

récupération des fichiers (asynchrone)

gestion des modes de parallélisation avec future Téléchargement de fichiers multiple

future::plan(multiprocess)
csvInParallel <- function(){
  futureValues <- list()
  for (i in 1:length(dates)){
    fileName <- paste0(dates[i],".csv")
    filePath <- paste0(url,"/",fileName)
    csvSaveFilePath <- savedfilePaths[i]
    # objets R promis
    futureData <-  future::future({getCSVFileFromUrl(filePath, csvSaveFilePath)})
    # ajout des chemins de sauvegarde a une liste
    futureValues <- append(futureValues, futureData)
  }
  # Effectuer une autre action pendant que les fichiers
  # se téléchargent
  while (!resolved(futureValues)) {
    future_time <- future::future({ 
      jsonlite::fromJSON("http://worldclockapi.com/api/json/utc/now") 
    })
    time <- value(future_time)
    #date time 
    print(paste0("Date time :",time$currentDateTime))
    #The timestamp is the number of 100-nanoseconds intervals 
    # (1 nanosecond = one billionth of a   second) since Jan 1, 1601 UTC
    print(paste0("File time :",time$currentFileTime))
    print("Waiting 2 seconds")
    Sys.sleep(2)
  }
  future::values(futureValues)
}
benchmarkParallel1 <- microbenchmark::microbenchmark(csvInParallel(),times = benchmarkTimes)
benchmarkParallel1

Débit entre 20 et 40 Mo/s

[1] "Date time :2019-04-08T06:30Z"
[1] "File time :131991786547457856"
[1] "Waiting 2 seconds"
[1] "Date time :2019-04-08T06:30Z"
[1] "File time :131991786592284944"
[1] "Waiting 2 seconds"
....
[1] "Date time :2019-04-08T06:32Z"
[1] "File time :131991787503780368"
[1] "Waiting 2 seconds"
[1] "Date time :2019-04-08T06:32Z"
[1] "File time :131991787545501808"
[1] "Waiting 2 seconds"
Unit: seconds
            expr      min       lq     mean   median       uq      max neval
 csvInParallel() 18.49188 20.23343 22.17851 22.49808 24.01268 25.65649     5

récupération de fichiers asynchrone

chargement de fichiers synchrone

dataOnceAtTime <- function(){
  dataFrameList = list()
  for (i in 1:length(savedfilePaths)){
    fileInDataFrame <- read_csv(savedfilePaths[i])
    dataFrameList = append(dataFrameList, fileInDataFrame)
  }
  df <- rbind(dataFrameList)
  # equivalent to without loop
  # df1 <-  read_csv(savedfilePaths[[1]])
  # df2 <-  read_csv(savedfilePaths[[2]])
  # df3 <-  read_csv(savedfilePaths[[3]])
  # df4 <-  read_csv(savedfilePaths[[4]])
  # df5 <-  read_csv(savedfilePaths[[5]])
  # df <- rbind(df1, df2, df3, df4, df5)
}

benchmarkNonParallel2 <- microbenchmark(dataOnceAtTime(),times = benchmarkTimes)
benchmarkNonParallel2
|=================================================================================| 100%  120 MB
|=================================================================================| 100%  120 MB
|=================================================================================| 100%  120 MB
|=================================================================================| 100%   86 MB
|=================================================================================| 100%   95 MB
Unit: seconds
             expr     min       lq     mean  median      uq      max neval
 dataOnceAtTime() 5.67231 5.792989 5.820838 5.85227 5.87071 5.915911     5

chargement de fichiers asynchrones

future::plan(multiprocess)
dataInParallel <- function(){
  futureList = list()
  for (i in 1:length(savedfilePaths)){
    futureTask <- future::future({read_csv(savedfilePaths[i])})
    futureList = append(futureList, futureTask)
  }
  df <- rbind(future::values(futureList))
  
   # equivalent to without loop
  # df1 <- future({ read_csv(savedfilePaths[[1]]) })
  # df2 <- future({ read_csv(savedfilePaths[[2]]) })
  # df3 <- future({ read_csv(savedfilePaths[[3]]) })
  # df4 <- future({ read_csv(savedfilePaths[[4]]) })
  # df5 <- future({ read_csv(savedfilePaths[[5]]) })
  # df <- rbind(value(df1), value(df2), value(df3), value(df4), value(df5))
}

benchmarkParallel2 <- microbenchmark(dataInParallel(),times = benchmarkTimes)
benchmarkParallel2
Unit: seconds
             expr      min       lq    mean   median       uq      max neval
 dataInParallel() 3.427962 3.512891 3.59065 3.594863 3.613915 3.803619     5