_ packages requis_
###########################
install.packages('pryr') # outils pour comprendre le langage R a bas niveau
install.packages('microbenchmark') # profillage de code
install.packages('parallel') # parallélisme
install.packages('doParallel') # parallélisme
install.packages('lubridate') # gestion du format date
install.packages('httr') # client HTTP
install.packages('foreach') # facilite la création de boucle for parallelisées
install.packages('readr') # lecture de fichiers
install.packages('future') # objets promis (gestion asynchrone)
Exemples extraits de “Calcul Parallele avec R”, V.Miele & V.Louvet, edp sciences, 2016 (chargement des packages requis pour les exemples suivants)
library('pryr')
## Error in library("pryr"): aucun package nommé 'pryr' n'est trouvé
library('microbenchmark')
## Error in library("microbenchmark"): aucun package nommé 'microbenchmark' n'est trouvé
library('parallel')
library('doParallel')
## Error in library("doParallel"): aucun package nommé 'doParallel' n'est trouvé
fonction exemple : erreur de prediction d’une regresseur par approche ‘leave one out’
leave.one.out = function(i)
{
model = lm(Petal.Width ~ Petal.Length,data=my.iris[-i,])
pred.petal.width = predict(model,data.frame(Petal.Length=my.iris[i,"Petal.Length"]))
return((pred.petal.width-my.iris[i,"Petal.Width"])^2)
}
######################
# donnees de l'exemple : iris est automatiquement disponible pour tout processus R (mais pas my.iris)
my.iris = iris
######################
# vectorisation classique
microbenchmark(
lapply(1:150, FUN = function(i) leave.one.out(i)),
times=25
)
#########################
# nb Coeur total
detectCores()
Unit: milliseconds
expr min lq mean median uq
lapply(1:150, FUN = function(i) leave.one.out(i)) 169.5197 173.3894 176.8368 175.5258 179.1044
max neval
193.8009 25
[1] 8
APPROCHE PARALLELE ‘SNOW’ : SOCKET (multi-plateforme)
parLApply
test 1
cl = makeCluster(4, type="PSOCK")
# clusterExport(cl,list("my.iris"))
# clusterExport(cl,list("leave.one.out"))
# clusterExport(cl,list("leave.one.out","my.iris"))
#
microbenchmark(
parLapply(cl,1:150,fun=function(i) leave.one.out(i)),
times=25
)
## Error in microbenchmark(parLapply(cl, 1:150, fun = function(i) leave.one.out(i)), : impossible de trouver la fonction "microbenchmark"
stopCluster(cl)
### test 2
cl = makeCluster(4, type="PSOCK")
clusterExport(cl,list("leave.one.out"))
## Error in get(name, envir = envir): objet 'leave.one.out' introuvable
microbenchmark(
parLapply(cl,1:150,fun=function(i) leave.one.out(i)),
times=25
)
## Error in microbenchmark(parLapply(cl, 1:150, fun = function(i) leave.one.out(i)), : impossible de trouver la fonction "microbenchmark"
stopCluster(cl)
test 3
cl = makeCluster(4, type="PSOCK")
clusterExport(cl,list("leave.one.out","my.iris"))
microbenchmark(
parLapply(cl,1:150,fun=function(i) leave.one.out(i)),
times=25
)
stopCluster(cl)
Unit: milliseconds
expr min lq mean median
parLapply(cl, 1:150, fun = function(i) leave.one.out(i)) 47.06019 49.39344 57.31927 50.66847
uq max neval
60.99022 112.9672 25
utilisation memoire
L= lapply(1:5e2,matrix,nrow=1e3,ncol=1e3)
L= lapply(1:6e2,matrix,nrow=2e3,ncol=1e3)
object_size(L)
mem_used()
# surveiller la memoire [top]
rm(L)
# surveiller la memoire [top]
gc()
# surveiller la memoire [top]
#comparaison des temps en parallel avec bcp de données et temps de traitement negligeable
L= lapply(1:6e2,matrix,nrow=2e3,ncol=1e3)
# sequentiel
system.time(lapply(L,FUN= function(e) {NULL}))
#parallele
cl = makeCluster(2, type="PSOCK")
system.time(parLapply(cl,L,fun= function(e) {NULL}))
# surveiller la memoire [top]
mem_used()
stopCluster(cl)
rm(L)
gc()
4.8 GB
4.86 GB
used (Mb) gc trigger (Mb) max used (Mb)
Ncells 814468 43.5 1442291 77.1 1442291 77.1
Vcells 1778650 13.6 824630579 6291.5 1030787594 7864.3
utilisateur système écoulé
0.002 0.000 0.002
utilisateur système écoulé
4.865 1.062 9.055
4.86 GB
used (Mb) gc trigger (Mb) max used (Mb)
Ncells 814442 43.5 1442291 77.1 1442291 77.1
Vcells 1778612 13.6 791709355 6040.3 1030787594 7864.3
APPROCHE PARALLELE MULTICORE (FORKING, LINUX)
mémoire
L= lapply(1:6e2,matrix,nrow=2e3,ncol=1e3)
cl = makeCluster(2, type="FORK")
system.time(parLapply(cl,L,fun= function(e) {NULL}))
mem_used()
stopCluster(cl)
# surveiller la memoire [top]
cl = makeCluster(2, type="FORK")
system.time(parLapply(cl,L,fun= function(e) {L[[1]][1,1]=0}))
# feinte top est piegé (meme zone memoire) : c'est pas fiable...
stopCluster(cl)
utilisateur système écoulé
0.413 1.566 5.371
4.86 GB
utilisateur système écoulé
0.440 1.805 7.236
mclapply [parApply efficace en contexte multicore (forking)]
profite de la disponibilite automatique des donnees du processus pere syntaxe allegee
microbenchmark(
1:150,fun=function(i) leave.one.out(i),mc.cores=4,
times=50)
Unit: nanoseconds
expr min lq mean median uq max neval
1:150 297 398 535.20 452.5 594 3045 50
fun 55 58 134.26 68.0 70 3134 50
mc.cores 8 16 20.76 17.0 19 126 50
mcparallel & mccollect
foreach + Doparallel
Vectorisation classique Utilisation de foreach pour combiner les resultats en prenant la somme
microbenchmark(
foreach(i=1:150, .combine=sum) %do%{ leave.one.out(i)},
times=25
)
Unit: milliseconds
expr min lq mean
foreach(i = 1:150, .combine = sum) %do% { leave.one.out(i) } 196.4287 204.2119 213.8368
median uq max neval
208.2454 220.8814 268.027 25
doParallel : avec combinison des sorties
registerDoParallel(cores=4)
microbenchmark(
foreach(i=1:150, .combine=sum) %dopar% { leave.one.out(i)},
times=25
)
Unit: milliseconds
expr min lq mean
foreach(i = 1:150, .combine = sum) %dopar% { leave.one.out(i) } 324.8517 418.2593 419.838
median uq max neval
423.3756 427.1455 452.5507 25
Parallélisation de tâches
_ chargement des packages requis pour les exemples suivants_
library('lubridate')
library('httr')
library('foreach')
library('readr')
library('future')
library('jsonlite')
Nous allons aborder la notion de variables promises et de tâches asynchrones.
Exemple : le téléchargement de fichier
Préparation de téléchargement de fichiers de qualité de l’air
# entrepot de données pour la qualité de l'air
url <- "https://openaq-data.s3.amazonaws.com"
# nombre de jour
numberOfDays <- 5
# création du nombre de jour à récupérer pour le mois de mars 2018
# et des fichiers de sauvegardes
dates <- lubridate::ymd(paste0("2018", "-", "03", "-", seq(1, numberOfDays)))
savedfilePaths <- (paste0(tempdir(), "/2018", "-", "03", "-", seq(1, numberOfDays)))
print(savedfilePaths)
## [1] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-1"
## [2] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-2"
## [3] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-3"
## [4] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-4"
## [5] "C:\\Users\\sanchez\\AppData\\Local\\Temp\\Rtmpk7nN2G/2018-03-5"
print(dates)
## [1] "2018-03-01" "2018-03-02" "2018-03-03" "2018-03-04" "2018-03-05"
# nombre de repetition pour le profilage
benchmarkTimes = 5L
# recupère un csv d'une url et le sauvegarde sur le disque dur
##' @param filePath a string url
##' @param fileName fileName
##' @return
getCSVFileFromUrl <- function(filePath, saveFilePath) {
# recupération CSV d'un serveur web
tmp <- httr::GET(filePath)
# Stockage sur le disque dur dans les fichiers temporaires
write(httr::content(tmp, 'text'), saveFilePath)
return(saveFilePath)
}
[1] "/tmp/RtmpEvdyUS/2018-03-1" "/tmp/RtmpEvdyUS/2018-03-2" "/tmp/RtmpEvdyUS/2018-03-3"
[4] "/tmp/RtmpEvdyUS/2018-03-4" "/tmp/RtmpEvdyUS/2018-03-5"
[1] "2018-03-01" "2018-03-02" "2018-03-03" "2018-03-04" "2018-03-05"
récupération des fichiers (synchrone)
Téléchargement de fichiers normalement un par un
csvOnceAtTime <- function(){
for (i in 1:length(dates)){
fileName <- paste0(dates[i],".csv")
filePath <- paste0(url,"/",fileName)
csvSaveFilePath <- savedfilePaths[i]
# sauvegarde des fichiers
getCSVFileFromUrl(filePath, csvSaveFilePath)
}
}
benchmarkNonParallel1 <- microbenchmark::microbenchmark(csvOnceAtTime(),times = benchmarkTimes)
benchmarkNonParallel1
Débit : ~10 Mo/s
Unit: seconds
expr min lq mean median uq max neval
csvOnceAtTime() 43.98732 46.32592 46.54079 46.91352 47.11949 48.35772 5
récupération des fichiers (asynchrone)
gestion des modes de parallélisation avec future Téléchargement de fichiers multiple
future::plan(multiprocess)
csvInParallel <- function(){
futureValues <- list()
for (i in 1:length(dates)){
fileName <- paste0(dates[i],".csv")
filePath <- paste0(url,"/",fileName)
csvSaveFilePath <- savedfilePaths[i]
# objets R promis
futureData <- future::future({getCSVFileFromUrl(filePath, csvSaveFilePath)})
# ajout des chemins de sauvegarde a une liste
futureValues <- append(futureValues, futureData)
}
# Effectuer une autre action pendant que les fichiers
# se téléchargent
while (!resolved(futureValues)) {
future_time <- future::future({
jsonlite::fromJSON("http://worldclockapi.com/api/json/utc/now")
})
time <- value(future_time)
#date time
print(paste0("Date time :",time$currentDateTime))
#The timestamp is the number of 100-nanoseconds intervals
# (1 nanosecond = one billionth of a second) since Jan 1, 1601 UTC
print(paste0("File time :",time$currentFileTime))
print("Waiting 2 seconds")
Sys.sleep(2)
}
future::values(futureValues)
}
benchmarkParallel1 <- microbenchmark::microbenchmark(csvInParallel(),times = benchmarkTimes)
benchmarkParallel1
Débit entre 20 et 40 Mo/s
[1] "Date time :2019-04-08T06:30Z"
[1] "File time :131991786547457856"
[1] "Waiting 2 seconds"
[1] "Date time :2019-04-08T06:30Z"
[1] "File time :131991786592284944"
[1] "Waiting 2 seconds"
....
[1] "Date time :2019-04-08T06:32Z"
[1] "File time :131991787503780368"
[1] "Waiting 2 seconds"
[1] "Date time :2019-04-08T06:32Z"
[1] "File time :131991787545501808"
[1] "Waiting 2 seconds"
Unit: seconds
expr min lq mean median uq max neval
csvInParallel() 18.49188 20.23343 22.17851 22.49808 24.01268 25.65649 5
chargement de fichiers synchrone
dataOnceAtTime <- function(){
dataFrameList = list()
for (i in 1:length(savedfilePaths)){
fileInDataFrame <- read_csv(savedfilePaths[i])
dataFrameList = append(dataFrameList, fileInDataFrame)
}
df <- rbind(dataFrameList)
# equivalent to without loop
# df1 <- read_csv(savedfilePaths[[1]])
# df2 <- read_csv(savedfilePaths[[2]])
# df3 <- read_csv(savedfilePaths[[3]])
# df4 <- read_csv(savedfilePaths[[4]])
# df5 <- read_csv(savedfilePaths[[5]])
# df <- rbind(df1, df2, df3, df4, df5)
}
benchmarkNonParallel2 <- microbenchmark(dataOnceAtTime(),times = benchmarkTimes)
benchmarkNonParallel2
|=================================================================================| 100% 120 MB
|=================================================================================| 100% 120 MB
|=================================================================================| 100% 120 MB
|=================================================================================| 100% 86 MB
|=================================================================================| 100% 95 MB
Unit: seconds
expr min lq mean median uq max neval
dataOnceAtTime() 5.67231 5.792989 5.820838 5.85227 5.87071 5.915911 5
chargement de fichiers asynchrones
future::plan(multiprocess)
dataInParallel <- function(){
futureList = list()
for (i in 1:length(savedfilePaths)){
futureTask <- future::future({read_csv(savedfilePaths[i])})
futureList = append(futureList, futureTask)
}
df <- rbind(future::values(futureList))
# equivalent to without loop
# df1 <- future({ read_csv(savedfilePaths[[1]]) })
# df2 <- future({ read_csv(savedfilePaths[[2]]) })
# df3 <- future({ read_csv(savedfilePaths[[3]]) })
# df4 <- future({ read_csv(savedfilePaths[[4]]) })
# df5 <- future({ read_csv(savedfilePaths[[5]]) })
# df <- rbind(value(df1), value(df2), value(df3), value(df4), value(df5))
}
benchmarkParallel2 <- microbenchmark(dataInParallel(),times = benchmarkTimes)
benchmarkParallel2
Unit: seconds
expr min lq mean median uq max neval
dataInParallel() 3.427962 3.512891 3.59065 3.594863 3.613915 3.803619 5