博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HDFS文件上传下载(Excel 和Csv)
阅读量:3897 次
发布时间:2019-05-23

本文共 17543 字,大约阅读时间需要 58 分钟。

package com.dragonsoft.cicada.datacenter.modules;import java.io.*;import java.net.URI;import java.nio.charset.Charset;import java.nio.charset.StandardCharsets;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Date;import java.util.List;import java.util.zip.ZipOutputStream;import cn.hutool.core.text.csv.CsvData;import cn.hutool.core.text.csv.CsvUtil;import com.code.common.utils.StringUtils;import com.csvreader.CsvReader;import com.csvreader.CsvWriter;import com.dragoninfo.dfw.bean.Result;import com.dragonsoft.cicada.datacenter.modules.dataplan.externaldatasources.vo.DataSetAuthVo;import com.fw.service.BaseService;import com.fw.service.annotation.Service;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import lombok.extern.slf4j.Slf4j;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.IOUtils;import org.apache.poi.hssf.usermodel.HSSFRow;import org.apache.poi.hssf.usermodel.HSSFSheet;import org.apache.poi.hssf.usermodel.HSSFWorkbook;import org.apache.poi.ss.usermodel.*;import org.apache.poi.xssf.usermodel.XSSFRow;import org.apache.poi.xssf.usermodel.XSSFSheet;import org.apache.poi.xssf.usermodel.XSSFWorkbook;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.core.env.Environment;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.*;import org.springframework.web.multipart.MultipartFile;import javax.servlet.ServletOutputStream;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;@Controller@CrossOrigin@RequestMapping("/HdfsTest")@Api(value = "HdfsTest|HDFS测试")public class HdfsTest extends BaseService {
@Value("${hdfs.defaultFS}") private String hdfsUrl; @Value("${hdfs.hadoopLoginName:root}") private String hadoopLoginName; private static Configuration conf = new Configuration();/* static { conf.set("fs.defaultFS", hdfsUrl); conf.set("dfs.nameservices", "nameservice1"); conf.set("dfs.ha.namenodes.nameservice1", "nn1,nn2"); conf.set("dfs.namenode.rpc-address.nameservice1.nn1", "xxx:8020"); conf.set("dfs.namenode.rpc-address.nameservice1.nn2", "xxx:8020"); conf.set("dfs.client.failover.proxy.provider.nameservice1" ,"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");// conf.addResource("classpath:/hadoop/core-site.xml");// conf.addResource("classpath:/hadoop/hdfs-site.xml");// conf.addResource("classpath:/hadoop/mapred-site.xml"); }*/ @ResponseBody @RequestMapping("/initConfig") @ApiOperation(value = "初始化配置") public Result initConfig()throws IOException{
if(StringUtils.isBlank(hdfsUrl)){
System.out.println("----------hdfsUrl为空-------"); } conf.set("fs.defaultFS", hdfsUrl); return Result.success(hdfsUrl); } //创建新文件 public static void createFile(String dst , byte[] contents) throws IOException{
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path dstPath = new Path(dst); //目标路径 //打开一个输出流 FSDataOutputStream outputStream = fs.create(dstPath); outputStream.write(contents); outputStream.close(); fs.close(); System.out.println("文件创建成功!"); } private void init(){
conf.set("fs.defaultFS", hdfsUrl); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); } //上传本地文件 @ResponseBody @RequestMapping("/uploadFile") @ApiOperation(value = "上传本地文件") public Result uploadFile(String src,String dst) throws IOException{
try{
init(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(src); //本地上传文件路径 Path dstPath = new Path(dst); //hdfs目标路径 //调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false fs.copyFromLocalFile(false, srcPath, dstPath); //打印文件路径 System.out.println("Upload to "+conf.get("fs.default.name")); System.out.println("------------list files------------"+"\n"); FileStatus [] fileStatus = fs.listStatus(dstPath); for (FileStatus file : fileStatus) {
System.out.println(file.getPath()); } fs.close(); }catch (Exception e){
e.printStackTrace(); } return Result.success(); } @ResponseBody @PostMapping("/uploadFile2") @ApiOperation(value = "上传本地文件(带文件的方式)") public Result uploadFile2(@RequestParam("file") MultipartFile file,String bean){
try{
init(); FileSystem fs = FileSystem.get(conf); InputStream in = new BufferedInputStream(file.getInputStream()); String filename = file.getOriginalFilename(); FSDataOutputStream out = fs.create(new Path("/user/iof/DatasourceTest/"+filename)); IOUtils.copyBytes(in,out,conf); }catch (Exception e){
e.printStackTrace(); } return Result.success(); } @ResponseBody @RequestMapping("/downloadFile") @ApiOperation(value = "下载文件") public Result downloadFile(HttpServletResponse response, String hdfsPath, String locahPath){
InputStream in=null; OutputStream out=null; try{
init(); FileSystem fs = FileSystem.get(conf); in = fs.open(new Path(hdfsPath)); //out = new FileOutputStream(locahPath); //IOUtils.copyBytes(in, out, conf); response.setContentType("application/x-msDownload;charset=utf-8"); response.setCharacterEncoding("UTF-8"); response.setHeader("Content-Disposition", "attachment;filename=" + new String("文件下载.xlsx".getBytes(),"utf-8"));//下载文件的名称 ServletOutputStream sout = response.getOutputStream(); assert sout != null; try (BufferedInputStream bis = new BufferedInputStream(in); BufferedOutputStream bos = new BufferedOutputStream(sout)) {
byte[] buff = new byte[2048]; int bytesRead; while (-1 != (bytesRead = bis.read(buff, 0, buff.length))) {
bos.write(buff, 0, bytesRead); } bos.flush(); } }catch (Exception e){
e.printStackTrace(); } return Result.success(); } @ResponseBody @RequestMapping("/readSheet") @ApiOperation(value = "获取excel的sheet页") public Result readSheet(String path) throws Exception {
XSSFWorkbook workbook=new XSSFWorkbook(new FileInputStream(new File("C:/Users/Administrator/Desktop/test/DatasourceTest2.xlsx"))); List
sheetnames=new ArrayList<>(); for (int i = 0; i < workbook.getNumberOfSheets(); i++) {
//获取每个Sheet表 sheetnames.add(workbook.getSheetAt(i).getSheetName()); } return Result.success(sheetnames); } @ResponseBody @RequestMapping("/getExcelColumn") @ApiOperation(value = "获取excel每一列的字段") public Result getExcelColumn(@RequestParam("file") MultipartFile file) throws Exception {
int sheetNum=0; String fileHouZhui="xlsx"; Boolean userHeader=true; Integer startRow=1; Integer startColumn=1; Workbook wb = null; if("xls".equals(fileHouZhui)){
wb=new HSSFWorkbook(new FileInputStream(zhuanhuan(file))); }else if("xlsx".equals(fileHouZhui)){
wb=new XSSFWorkbook(new FileInputStream(zhuanhuan(file))); }else{
System.out.println("文件格式错误!"); } Sheet sheet = wb.getSheetAt(sheetNum); int num = sheet.getRow(0).getPhysicalNumberOfCells(); int columnNo=0; for (int i=startColumn-1;i
0) {
ps.write(data,0,read); fsdos.write(data, 0, read); } fsdis.close(); ps.close(); } fsdos.close(); return Result.success(); } @ResponseBody @RequestMapping("/mergeFile2") @ApiOperation(value = "文件合并2") public Result mergeFile2(String inputPath,String outPath) throws Exception{
init(); FileSystem fsSource = FileSystem.get(URI.create(inputPath),conf); FileSystem fsDst= FileSystem.get(URI.create(outPath),conf); FileStatus[] sourceStatus = fsSource.listStatus(new Path(inputPath),new MyPathFilter(".*\\.csv")); FSDataOutputStream fsdos = fsDst.create(new Path(outPath)); int count=0; for (FileStatus sta : sourceStatus) {
System.out.println("Path: " + sta.getPath() + "FileSize: " + sta.getLen() + "Limit: " + sta.getPermission() + "Content: "); FSDataInputStream fsdis = fsSource.open(sta.getPath()); Reader reader = new InputStreamReader(fsdis,"UTF-8"); BufferedReader bufferedReader = new BufferedReader(reader); /* Writer writer = new OutputStreamWriter(fsdos,"UTF-8"); BufferedWriter bufferedWriter = new BufferedWriter(writer);*/ String content = null; if(count!=0){
bufferedReader.readLine(); } count++; while((content = bufferedReader.readLine()) != null) {
//bufferedWriter.write(content+"\n"); content=content+"\n"; fsdos.write(content.getBytes(), 0, content.getBytes().length); System.out.println("-------"+content); } } fsdos.close(); fsdos.close(); return Result.success(); } @ResponseBody @RequestMapping("/mergeFile3") @ApiOperation(value = "文件合并3") public Result mergeFile3() throws Exception{
try {
String[] stringList; String csvFilePath = "C:\\Users\\Administrator\\Desktop\\20140227135936.csv"; String sourceFileString= "C:\\Users\\Administrator\\Desktop\\test.csv"; CsvReader reader = new CsvReader(csvFilePath); //默认是逗号分隔符,UTF-8编码 CsvWriter writer = new CsvWriter(sourceFileString); /* * readRecord()判断是否还有记录,getValues()读取当前记录,然后指针下移 */ reader.readRecord(); //writer.writeRecord(reader.getValues()); //读取表头 /* * 逐行读取,以免文件太大 * 处理表头后面的数据,这里是在第12列数据统一加前缀"V" */ while(reader.readRecord()){
stringList = reader.getValues(); stringList[11] = 'V' + stringList[11]; writer.writeRecord(stringList); } reader.close(); writer.close(); }catch(Exception ex) {
ex.printStackTrace(); } return Result.success(); } @ResponseBody @RequestMapping("/downloadFileCsv") @ApiOperation(value = "下载文件CSV") public Result downloadFileCsv(HttpServletResponse response, String hdfsPath, String locahPath) {
InputStream in = null; OutputStream out = null; try {
init(); FileSystem fs = FileSystem.get(conf); in = fs.open(new Path(hdfsPath)); //out = new FileOutputStream(locahPath); //IOUtils.copyBytes(in, out, conf); response.setContentType("application/x-msDownload;charset=utf-8"); response.setCharacterEncoding("UTF-8"); response.setHeader("Content-Disposition", "attachment;filename=" + new String("文件下载.xlsx".getBytes(), "utf-8"));//下载文件的名称 ServletOutputStream sout = response.getOutputStream(); assert sout != null; try (BufferedInputStream bis = new BufferedInputStream(in); BufferedOutputStream bos = new BufferedOutputStream(sout)) {
byte[] buff = new byte[2048]; int bytesRead; while (-1 != (bytesRead = bis.read(buff, 0, buff.length))) {
bos.write(buff, 0, bytesRead); } bos.flush(); } } catch (Exception e) {
e.printStackTrace(); } return Result.success(); } @ResponseBody @RequestMapping("/rename") @ApiOperation(value = "文件重命名") public void rename(String oldName,String newName) throws IOException{
conf.set("fs.defaultFS", hdfsUrl); FileSystem fs = FileSystem.get(conf); Path oldPath = new Path(oldName); Path newPath = new Path(newName); boolean isok = fs.rename(oldPath, newPath); if(isok){
System.out.println("rename ok!"); }else{
System.out.println("rename failure"); } fs.close(); } //删除文件 public static void delete3(String filePath) throws IOException{
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(filePath); boolean isok = fs.deleteOnExit(path); if(isok){
System.out.println("delete ok!"); }else{
System.out.println("delete failure"); } fs.close(); } //创建目录 @ResponseBody @RequestMapping("/mkdir") @ApiOperation(value = "创建目录") public Result mkdir(String path) throws IOException{
conf.set("fs.defaultFS", hdfsUrl); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(path); boolean isok = fs.mkdirs(srcPath); if(isok){
System.out.println("create " + path + " dir ok!"); }else{
System.out.println("create " + path + " dir failure"); } fs.close(); return Result.success(isok); } //读取文件的内容 public static void readFile(String filePath) throws IOException{
Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(filePath); InputStream in = null; try {
in = fs.open(srcPath); IOUtils.copyBytes(in, System.out, 4096, false); //复制到标准输出流 } finally {
IOUtils.closeStream(in); } } /** * 遍历指定目录(direPath)下的所有文件 */ @ResponseBody @RequestMapping("/getDirectoryFromHdfs") @ApiOperation(value = "遍历指定目录(direPath)下的所有文件") public void getDirectoryFromHdfs(String direPath){
try {
conf.set("fs.defaultFS", hdfsUrl); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(URI.create(direPath),conf); FileStatus[] filelist = fs.listStatus(new Path(direPath)); for (int i = 0; i < filelist.length; i++) {
System.out.println("_________" + direPath + "目录下所有文件______________"); FileStatus fileStatus = filelist[i]; System.out.println("Name:"+fileStatus.getPath().getName()); System.out.println("Size:"+fileStatus.getLen()); System.out.println("Path:"+fileStatus.getPath()); } fs.close(); } catch (Exception e){
e.printStackTrace(); } } public static void main(String[] args) throws IOException {
/* String today = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); String localFilePath = "F:\\datafortag\\maimaimai\\quan-" + today; String hdfsFilePath = "/user/rec/maimaimai/upload_month=" + today.substring(0,7) + "/upload_date=" + today + "/"; System.out.println(localFilePath); System.out.println(hdfsFilePath);*/ //"/user/rec/maimaimai/upload_month=2016-11/upload_date=2016-11-09/" //1、遍历指定目录(direPath)下的所有文件 //getDirectoryFromHdfs("/user/rec/maimaimai"); //2、新建目录 //mkdir(hdfsFilePath); //3、上传文件 //uploadFile(localFilePath, hdfsFilePath); //getDirectoryFromHdfs(hdfsFilePath); //4、读取文件 //readFile("/user/rec/maimaimai/quan-2016-11-09"); //5、重命名// rename("/user/rec/maimaimai/2016-11/2016-11-09/quan-2016-11-09", "/user/rec/maimaimai/2016-11/2016-11-09/quan-2016-11-08.txt");// getDirectoryFromHdfs("/user/rec/maimaimai/2016-11/2016-11-09"); //6、创建文件,并向文件写入内容 //byte[] contents = "hello world 世界你好\n".getBytes(); //createFile("/user/rec/maimaimai/2016-11/2016-11-09/test.txt",contents); //readFile("/user/rec/maimaimai/2016-11/2016-11-09/test.txt"); //7、删除文件 //delete("/user/rec/maimaimai/quan-2016-11-08.txt"); //使用相对路径 //delete("test1"); //删除目录 System.out.println("AB属于第几列 ------"+excelColStrToNum("iv")); } public static int excelColStrToNum(String column) {
int num = 0; int result = 0; int length =column.length(); for(int i = 0; i < length; i++) {
char ch = column.charAt(length - i - 1); num = (int)(ch - 'A' + 1) ; num *= Math.pow(26, i); result += num; } return result; }}
package com.dragonsoft.cicada.datacenter.modules;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;/** * @author: yecc * @create: 2021-03-23 15:29 */public class MyPathFilter implements PathFilter{
String reg = null; public MyPathFilter(String reg){
this.reg = reg; } @Override public boolean accept(Path path){
if(path.toString().matches(reg)){
return true; } return false; }}

转载地址:http://tnyen.baihongyu.com/

你可能感兴趣的文章
PAT---A1019. General Palindromic Number (20)
查看>>
PAT---A1027. Colors in Mars (20)
查看>>
PAT---1058. A+B in Hogwarts (20)
查看>>
PAT---A1001. A+B Format (20)
查看>>
PAT---A1005. Spell It Right (20)
查看>>
PAT---A1035. Password (20)
查看>>
PAT---A1077. Kuchiguse (20)
查看>>
PAT---A1062. Talent and Virtue (25)
查看>>
PAT---A1012. The Best Rank (25)
查看>>
数据库SQL语言语法总结3---查询语句
查看>>
数据库SQL语言语法总结4---数据更新
查看>>
数据库SQL语言语法总结5---视图
查看>>
数据库SQL语言语法总结6---数据控制
查看>>
数据库SQL语言语法总结7---嵌入式SQL
查看>>
数据库SQL语言语法总结1---表操作
查看>>
Numpy中stack(),hstack(),vstack()函数详解
查看>>
基于3D卷积神经网络的行为识别
查看>>
K.function用法
查看>>
keras -- multi-loss
查看>>
pytorch数据增强的具体细节
查看>>