Untitled

 avatar
unknown
plain_text
2 years ago
1.5 kB
8
Indexable
Sys.setenv("HADOOP_CMD"="/usr/local/hadoop/bin/hadoop")
Sys.setenv("HADOOP_STREAMING"="/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.4.jar")
Sys.setenv("HADOOP_HOME"="/usr/local/hadoop")
Sys.setenv("JAVA_HOME"="/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.362.b08-1.el7_9.x86_64")

library(rmr2)
library(rhdfs)
library(lubridate)
library(stringr)
library(rJava)
hdfs.init()


DocMese = NULL
Qnt = NULL

map = function(.,lines) {
  for(i in 1:length(lines)){
    fields = unlist(strsplit(lines[i], ","))
    AnnoMese = as.character(str_sub(ymd(fields[3]),1,7))
    Quantita = as.numeric(fields[4])
    print(AnnoMese)
    for(k in 1:length(fields)){
      if(fields[k] == "FATTURA"){
        RigaAnnoMese = paste0(AnnoMese," ", fields[k])
        DocMese = rbind(DocMese, RigaAnnoMese)
        Qnt = rbind(Qnt, Quantita)
      }
    }
  }
  return( keyval(DocMese, Qnt) )
}

reduce = function(key,val){
  keyval(key, mean(round(val, digits = 2)))
}

job1=function(input, output=NULL){
  mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce,combine = T)
}




system("/usr/local/hadoop/bin/hdfs dfs -rm -r /appoLuca/out")
hdfs.root <- '/appoLuca'
hdfs.data <- file.path(hdfs.root, 'ordini.csv')
hdfs.out <- file.path(hdfs.root, 'out')
out <- job1(hdfs.data, hdfs.out)
results <- from.dfs(out)
results.df <- as.data.frame(results, stringsAsFactors=F)
print("printing output...\n")
colnames(results.df)
print(results.df)

Editor is loading...