SpringBoot2.x 集成Hadoop3.0.3 实现HDFS文件系统管理 - Go语言中文社区

SpringBoot2.x 集成Hadoop3.0.3 实现HDFS文件系统管理


任务要求:搭建SpringBoot 2.x 集成Hadoop3.0.3环境,实现Hadoop 重要组成部分HDFS 文件系统管理的封装。

核心pom.xml 文件:

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.1.RELEASE</version>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<mybatis-spring-boot-starter.version>1.3.2</mybatis-spring-boot-starter.version>
		<mysql-connector-java.version>8.0.11</mysql-connector-java.version>
		<com.alibaba.druid.version>1.1.9</com.alibaba.druid.version>
		<commons-lang.version>2.6</commons-lang.version>
		<commons-codec.version>1.10</commons-codec.version>
		<commons-lang3.version>3.8.1</commons-lang3.version>
		<commons-net.version>3.6</commons-net.version>
		<commons-io.version>2.6</commons-io.version>
		<commons-collections.version>3.2.1</commons-collections.version>
		<common-fileupload.version>1.3.1</common-fileupload.version>
		<fastjson.version>1.2.48</fastjson.version>
		<jasperreports.version>6.10.0</jasperreports.version>
	</properties>


	<dependencies>
		<!-- SpringWeb模块 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<!-- 移除springboot 自带日志框架log-back  -->
			 <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
		</dependency>

		<!--springboot 集成测试框架 -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>


		<!--lombok插件 -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>${lombok.version}</version>
			<scope>provided</scope>
		</dependency>


		<!-- mysql 连接 -->
		<dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>${mybatis-spring-boot-starter.version}</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql-connector-java.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid-spring-boot-starter</artifactId>
			<version>${com.alibaba.druid.version}</version>
		</dependency>
		<!-- 分页控件 -->
		<dependency>
			<groupId>com.github.pagehelper</groupId>
			<artifactId>pagehelper</artifactId>
			<version>4.1.6</version>
		</dependency>

		<!--common-lang 常用工具包 -->
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>${commons-lang.version}</version>
		</dependency>
		<!--commons-lang3 工具包 -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-lang3</artifactId>
			<version>${commons-lang3.version}</version>
		</dependency>

		<!--commons-codec 加密工具包 -->
		<dependency>
			<groupId>commons-codec</groupId>
			<artifactId>commons-codec</artifactId>
			<version>${commons-codec.version}</version>
		</dependency>
		<!--commons-net 网络工具包 -->
		<dependency>
			<groupId>commons-net</groupId>
			<artifactId>commons-net</artifactId>
			<version>${commons-net.version}</version>
		</dependency>
		<!--common-io 工具包 -->
		<dependency>
			<groupId>commons-io</groupId>
			<artifactId>commons-io</artifactId>
			<version>${commons-io.version}</version>
		</dependency>
		<!--common-collection 工具包 -->
		<dependency>
			<groupId>commons-collections</groupId>
			<artifactId>commons-collections</artifactId>
			<version>${commons-collections.version}</version>
		</dependency>
		<!--common-fileupload 工具包 -->
		<dependency>
			<groupId>commons-fileupload</groupId>
			<artifactId>commons-fileupload</artifactId>
			<version>${common-fileupload.version}</version>
		</dependency>

		<!-- Swagger2 -->
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger2</artifactId>
			<version>2.7.0</version>
		</dependency>
		<dependency>
			<groupId>io.springfox</groupId>
			<artifactId>springfox-swagger-ui</artifactId>
			<version>2.7.0</version>
		</dependency>

		<!-- fastjson -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>

        <!-- 解决提示:Missing artifact jdk.tools:jdk.tools:jar:1.8 -->
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.8</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>

		<!-- spring-test -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-test</artifactId>
			<version>5.1.3.RELEASE</version>
		</dependency>

		<!-- 集成hadoop-3.0.3 -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-streaming</artifactId>
			<version>3.0.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-yarn-common</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-distcp</artifactId>
			<version>3.0.3</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
			<version>3.0.3</version>
			<scope>provided</scope>
		</dependency>

核心依赖冲突jar 包问题描述:

1:hadoop 3.0.3 依赖的核心包日志采用log4j 方式记录而spring-boot-start-web 核心jar包日志采用logback 方式记录。我这里选择使用log4j 的方式记录日志信息。所有在spring-boot-start-web.jar 文件依赖中移除对logback 日志文件方式的依赖:

<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<!-- 移除springboot 自带日志框架log-back  -->
			 <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
		</dependency>

2、hadoop3.0.3 核心包依赖的guava.jar 的版本与swagger 核心依赖的guava.jar 的版本不匹配。

截图如下:

解决办法:由于hadoop 依赖的guava.jar 版本比较低,选择的方案是hadoop 依赖的guava.jar 全部移除。

<!-- 集成hadoop-3.0.3 -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-streaming</artifactId>
			<version>3.0.3</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-yarn-common</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-distcp</artifactId>
			<version>3.0.3</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.0.3</version>
			<exclusions>
				<exclusion>
					<groupId>com.google.guava</groupId>
					<artifactId>guava</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
			<version>3.0.3</version>
			<scope>provided</scope>
		</dependency>

模块配置化对象:

package com.zzg.hadoop.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
/**
 * druid 监控配置
 * @author zzg
 *
 */
@Configuration
public class DruidConfig {
	 	@Bean
	    public ServletRegistrationBean druidServletRegistrationBean() {
	        ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean();
	        servletRegistrationBean.setServlet(new StatViewServlet());
	        servletRegistrationBean.addUrlMappings("/druid/*");
	        servletRegistrationBean.addInitParameter("allow", "");
	        servletRegistrationBean.addInitParameter("deny", "");
	        servletRegistrationBean.addInitParameter("loginUsername", "admin");
	        servletRegistrationBean.addInitParameter("loginPassword", "admin");
	        return servletRegistrationBean;
	    }

	    /**
	     * 注册DruidFilter拦截
	     *
	     * @return
	     */
	    @Bean
	    public FilterRegistrationBean duridFilterRegistrationBean() {
	        FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
	        filterRegistrationBean.setFilter(new WebStatFilter());
	        Map<String, String> initParams = new HashMap<String, String>();
	        //设置忽略请求
	        initParams.put("exclusions", "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*");
	        filterRegistrationBean.setInitParameters(initParams);
	        filterRegistrationBean.addUrlPatterns("/*");
	        return filterRegistrationBean;
	    }
}
package com.zzg.hadoop.config;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * hadoop hdfs 参数配置
 * @author zzg
 *
 */
@Configuration
public class HadoopHDFSConfig {
	
	// 日志记录
	private static final Logger logger = LoggerFactory.getLogger(HadoopHDFSConfig.class);	

	@Value("${hdfs.hdfsPath}")
	private String hdfsPath;
	@Value("${hdfs.hdfsName}")
	private String hdfsName;
	
	/**
	 * hadoop hdfs 配置参数对象
	 * @return
	 */
	@Bean
	public org.apache.hadoop.conf.Configuration  getConfiguration(){
		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
		configuration.set("fs.defaultFS", hdfsPath);
		return configuration;
	}
	/**
	 * hadoop filesystem 文件系统
	 * @return
	 */
	@Bean
	public FileSystem getFileSystem(){
		FileSystem fileSystem = null;
		try {
			fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}
		return fileSystem;
	}
	
	

}
package com.zzg.hadoop.config;

import java.util.Properties;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.github.pagehelper.PageHelper;

/**
 * mybatis 配置对象
 * @author zzg
 *
 */
@Configuration
public class MyBatisConfig {
	/**
	 * 分页对象实列化
	 * @return
	 */
	@Bean
	public PageHelper pageHelper() {
		PageHelper pageHelper = new PageHelper();
		Properties p = new Properties();
		p.setProperty("offsetAsPageNum", "true");
		p.setProperty("rowBoundsWithCount", "true");
		p.setProperty("reasonable", "true");
		p.setProperty("dialect", "mysql");
		pageHelper.setProperties(p);
		return pageHelper;
	}
}
package com.zzg.hadoop.config;

import java.util.ArrayList;
import java.util.List;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import io.swagger.annotations.ApiOperation;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.ParameterBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.schema.ModelRef;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
	@Bean
	public Docket buildDocket() {

		ParameterBuilder tokenPar = new ParameterBuilder();
		List<Parameter> pars = new ArrayList<Parameter>();
		tokenPar.name("X-CSRF-TOKEN").description("令牌").modelRef(new ModelRef("string")).parameterType("header")
				.required(false).build();
		pars.add(tokenPar.build());

		return new Docket(DocumentationType.SWAGGER_2).select()
				.apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class)).paths(PathSelectors.any())
				.build().globalOperationParameters(pars).apiInfo(buildApiInf());
	}

	private ApiInfo buildApiInf() {
		return new ApiInfoBuilder().title("****").termsOfServiceUrl("http://www.baidu.cn/")
				.description("API接口")
				.contact(new Contact("baidu", "http://www.baidu.cn/", "zhouzhiwengang@163.com"))
				.version("2.0").build();

	}
}

针对Hadoop3.0.3 的HDFS文件操作服务定义和服务实现.

package com.zzg.hadoop.service;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.BlockLocation;
import org.springframework.web.multipart.MultipartFile;

/**
 * hadoop hdfs 通用接口定义
 * @author zzg
 *
 */
public interface HDFSService {
	/**
	 * HDFS 文件夹创建
	 * @param path
	 * @return
	 */
	public boolean mkdirFolder(String path);
	
	/**
	 * HDFS 文件是否存在
	 * @param path
	 * @return
	 */
	public boolean existFile(String path);
	
	/**
	 * HDFS 读取目录信息
	 * @param path
	 * @return
	 */
	public List<Map<String, Object>> readCatalog(String path);
	
	/**
	 * HDFS 创建文件
	 * @param path
	 * @param file
	 */
	public void createFile(String path, MultipartFile file);
	
	/**
	 * HDFS 读取文件内容
	 * @param path
	 * @return
	 */
	public String readFileContent(String path);
	
	/**
	 * HDFS 读完文件列表
	 * @param path
	 * @return
	 */
	public List<Map<String, String>> listFile(String path);
	
	/**
	 * HDFS 文件重命名
	 * @param oldName
	 * @param newName
	 * @return
	 */
	public boolean renameFile(String oldName, String newName);
	
	/**
	 * HDFS 文件删除
	 * @param path
	 * @return
	 */
	public boolean deleteFile(String path);
	
	/**
	 * HDFS 文件上传
	 * @param path
	 * @param uploadPath
	 */
	public void uploadFile(String path, String uploadPath);
	
	/**
	 * HDFS 文件下载
	 * @param path
	 * @param downloadPath
	 */
	public void downloadFile(String path, String downloadPath);
	
	/**
	 * HDFS 文件复制
	 * @param sourcePath
	 * @param targetPath
	 */
	public void copyFile(String sourcePath, String targetPath);
	
	
	/**
	 * HDFS 读取指定文件 返回字节数组
	 * @param path
	 * @return
	 */
	public byte[] openFileToBytes(String path); 
	
	/**
	 * HDFS 获取指定文件 BlockLocation信息
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public BlockLocation[] getFileBlockLocations(String path);
}
package com.zzg.hadoop.service.impl;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import com.zzg.hadoop.service.HDFSService;

@Service
public class HDFSServiceImpl implements HDFSService {
	// 全局变量定义
	private static final int bufferSize = 1024 * 1024 * 64;

	
	// 日志记录服务
	private static final Logger logger = LoggerFactory.getLogger(HDFSServiceImpl.class);

	@Autowired
	private FileSystem fileSystem;

	@Override
	public boolean mkdirFolder(String path) {
		// TODO Auto-generated method stub
		boolean target = false;
		if (StringUtils.isEmpty(path)) {
			return false;
		}
		if (existFile(path)) {
			return true;
		}
		Path src = new Path(path);
		try {
			target = fileSystem.mkdirs(src);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}
		return target;
	}

	@Override
	public boolean existFile(String path) {
		// TODO Auto-generated method stub
		boolean target = false;

		if (StringUtils.isEmpty(path)) {
			return target;
		}
		Path src = new Path(path);
		try {
			target = fileSystem.exists(src);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}
		return target;
	}

	@Override
	public List<Map<String, Object>> readCatalog(String path) {
		// TODO Auto-generated method stub
		if (StringUtils.isEmpty(path)) {
			return null;
		}
		if (!existFile(path)) {
			return null;
		}

		// 目标路径
		Path newPath = new Path(path);
		FileStatus[] statusList = null;
		try {
			statusList = fileSystem.listStatus(newPath);
		} catch (FileNotFoundException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}
		List<Map<String, Object>> list = new ArrayList<>();
		if (null != statusList && statusList.length > 0) {
			for (FileStatus fileStatus : statusList) {
				Map<String, Object> map = new HashMap<>();
				map.put("filePath", fileStatus.getPath());
				map.put("fileStatus", fileStatus.toString());
				list.add(map);
			}
			return list;
		} else {
			return null;
		}

	}

	@Override
	public void createFile(String path, MultipartFile file) {
		// TODO Auto-generated method stub
		if (StringUtils.isEmpty(path)) {
			return;
		}
		String fileName = file.getName();
		Path newPath = new Path(path + "/" + fileName);
		// 打开一个输出流
		FSDataOutputStream outputStream;
		try {
			outputStream = fileSystem.create(newPath);
			outputStream.write(file.getBytes());
			outputStream.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}
	}

	@Override
	public String readFileContent(String path) {
		// TODO Auto-generated method stub
		StringBuffer sb = new StringBuffer();
		if (StringUtils.isEmpty(path)) {
			return null;
		}
		if (!existFile(path)) {
			return null;
		}
		// 目标路径
		Path srcPath = new Path(path);
		FSDataInputStream inputStream = null;
		try {
			inputStream = fileSystem.open(srcPath);
			// 防止中文乱码
			BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
			String lineTxt = "";
			while ((lineTxt = reader.readLine()) != null) {
				sb.append(lineTxt);
			}
		}catch(Exception e){
			logger.error(e.getMessage());
		} finally {
			try {
				inputStream.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				logger.error(e.getMessage());
			}
		}
		return sb.toString();
	}

	@Override
	public List<Map<String, String>> listFile(String path) {
		// TODO Auto-generated method stub
		List<Map<String, String>> returnList = new ArrayList<>();
		if (StringUtils.isEmpty(path)) {
			return null;
		}
		if (!existFile(path)) {
			return null;
		}
		// 目标路径
		Path srcPath = new Path(path);
		// 递归找到所有文件
		try{
			RemoteIterator<LocatedFileStatus> filesList = fileSystem.listFiles(srcPath, true);
			while (filesList.hasNext()) {
				LocatedFileStatus next = filesList.next();
				String fileName = next.getPath().getName();
				Path filePath = next.getPath();
				Map<String, String> map = new HashMap<>();
				map.put("fileName", fileName);
				map.put("filePath", filePath.toString());
				returnList.add(map);
			}
		}catch(Exception e){
			logger.error(e.getMessage());
		}
		return returnList;
	}

	@Override
	public boolean renameFile(String oldName, String newName) {
		// TODO Auto-generated method stub
		boolean target = false;
		if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
			return false;
		}
		// 原文件目标路径
		Path oldPath = new Path(oldName);
		// 重命名目标路径
		Path newPath = new Path(newName);
		try{
			target = fileSystem.rename(oldPath, newPath);			
		}catch(Exception e){
			logger.error(e.getMessage());
		}
		
		return target;
	}

	@Override
	public boolean deleteFile(String path) {
		// TODO Auto-generated method stub
		boolean target = false;
		if (StringUtils.isEmpty(path)) {
			return false;
		}
		if (!existFile(path)) {
			return false;
		}
		Path srcPath = new Path(path);
		try{
			target = fileSystem.deleteOnExit(srcPath);			
		}catch(Exception e){
			logger.error(e.getMessage());
		}

		return target;

	}

	@Override
	public void uploadFile(String path, String uploadPath) {
		// TODO Auto-generated method stub
		if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
			return;
		}
		// 上传路径
		Path clientPath = new Path(path);
		// 目标路径
		Path serverPath = new Path(uploadPath);
 
		// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
		try {
			fileSystem.copyFromLocalFile(false, clientPath, serverPath);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}

	}

	@Override
	public void downloadFile(String path, String downloadPath) {
		// TODO Auto-generated method stub
		if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
			return;
		}
		// 上传路径
		Path clientPath = new Path(path);
		// 目标路径
		Path serverPath = new Path(downloadPath);
 
		// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
		try {
			fileSystem.copyToLocalFile(false, clientPath, serverPath);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}

	}

	@Override
	public void copyFile(String sourcePath, String targetPath) {
		// TODO Auto-generated method stub
		if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
			return;
		}
		// 原始文件路径
		Path oldPath = new Path(sourcePath);
		// 目标路径
		Path newPath = new Path(targetPath);
 
		FSDataInputStream inputStream = null;
		FSDataOutputStream outputStream = null;
		try {
			try{
				inputStream = fileSystem.open(oldPath);
				outputStream = fileSystem.create(newPath);
				
				IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);				
			}catch(Exception e){
				logger.error(e.getMessage());
			}
		} finally {
			try{
				inputStream.close();
				outputStream.close();				
			}catch(Exception e){
				logger.error(e.getMessage());
			}

		}

	}

	@Override
	public byte[] openFileToBytes(String path) {
		// TODO Auto-generated method stub
		 byte[] bytes= null;
		if (StringUtils.isEmpty(path)) {
			return null;
		}
		if (!existFile(path)) {
			return null;
		}
		// 目标路径
		Path srcPath = new Path(path);
		try {
			FSDataInputStream inputStream = fileSystem.open(srcPath);
			bytes = IOUtils.readFullyToByteArray(inputStream);
		}catch(Exception e){
			logger.error(e.getMessage());
		}
		return bytes;

	}

	@Override
	public BlockLocation[] getFileBlockLocations(String path) {
		// TODO Auto-generated method stub
		BlockLocation[] blocks = null;
		if (StringUtils.isEmpty(path)) {
			return null;
		}
		if (!existFile(path)) {
			return null;
		}
		// 目标路径
		Path srcPath = new Path(path);
		try{
			FileStatus fileStatus = fileSystem.getFileStatus(srcPath);
			blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
		}catch(Exception e){
			logger.error(e.getMessage());
		}
		return blocks;

	}

}

针对Hadoop 3.0.3 的HDFS 文件操作测试Controller

package com.zzg.hadoop.controller;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;

import com.alibaba.fastjson.JSONObject;
import com.zzg.hadoop.config.HadoopHDFSConfig;
import com.zzg.hadoop.service.HDFSService;
import com.zzg.jreport.response.JreportResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;

@Controller
@RequestMapping("/api/hadoop/hdfs")
@Api(value = "HDFS Controlle", tags = "HDFS 操作服务")
public class HDFSController {
	// 日志记录
	private static final Logger logger = LoggerFactory.getLogger(HDFSController.class);	
	@Autowired
	private HDFSService service;
	
	/**
	 * 创建的文件夹权限不够,需要设置权限问题
	 * @param entity
	 * @return
	 */
	
	@ApiOperation(httpMethod = "POST", value = "创建文件夹")
	@RequestMapping(value = "/mkdirFolder", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse mkdirFolder(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		boolean target = service.mkdirFolder(entity.getString("path"));
		return JreportResponse.ok(target);
		
	}
	
	@ApiOperation(httpMethod = "POST", value = "判断文件夹是否存在")
	@RequestMapping(value = "/existFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse existFile(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		boolean target = service.existFile(entity.getString("path"));
		return JreportResponse.ok(target);
	}
	
	@ApiOperation(httpMethod = "POST", value = "读取目录")
	@RequestMapping(value = "/readCatalog", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse readCatalog(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		 List<Map<String, Object>> list = service.readCatalog(entity.getString("path"));
		return JreportResponse.ok(list);
	}
	
	@ApiOperation(httpMethod = "POST", value = "新建文件")
	@RequestMapping(value = "/createFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse createFile(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		FileInputStream inputStream = null;
		MultipartFile file = null;
		try {
			inputStream = new FileInputStream("C:\data\words.txt");
			file = new MockMultipartFile("test.txt", inputStream);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			logger.error(e.getMessage());
		}finally{
			try {
				inputStream.close();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				logger.error(e.getMessage());
			}
		}
		service.createFile(entity.getString("path"),file);
		return JreportResponse.ok();
	}
	
	@ApiOperation(httpMethod = "POST", value = "读取文件内容")
	@RequestMapping(value = "/readFileContent", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse readFileContent(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		String content = service.readFileContent(entity.getString("path"));
		return JreportResponse.ok(content);
	}
	
	@ApiOperation(httpMethod = "POST", value = "文件列表")
	@RequestMapping(value = "/listFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse listFile(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		List<Map<String, String>> list = service.listFile(entity.getString("path"));
		return JreportResponse.ok(list);
	}
	
	@ApiOperation(httpMethod = "POST", value = "文件重命名")
	@RequestMapping(value = "/renameFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse renameFile(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		boolean target = service.renameFile(entity.getString("oldName"),entity.getString("newName"));
		return JreportResponse.ok(target);
	}
	
	/**
	 * 指定文件位删除成功,需要寻找原因
	 * @param entity
	 * @return
	 */
	@ApiOperation(httpMethod = "POST", value = "文件删除")
	@RequestMapping(value = "/deleteFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse deleteFile(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		boolean target = service.deleteFile(entity.getString("path"));
		return JreportResponse.ok(target);
	}
	
	
	@ApiOperation(httpMethod = "POST", value = "文件拷贝")
	@RequestMapping(value = "/uploadFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
	@ResponseBody
	public JreportResponse uploadFile(
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
		service.uploadFile(entity.getString("path"), entity.getString("uploadPath"));
		return JreportResponse.ok();
	}
	
	
	
	

}

其他业务domain、mapper、service 、serviceImpl 和controller 省略。

application.properties和log4j.properties 配置文件内容:

# æå®æå¡ç«¯å£
server.port=7090
# æå®æå¡ å称
# server.context-path=/jreport
#mybatis xml æ件éç½®
mybatis.mapper-locations=classpath*:mapper/hadoop/*Mapper.xml
mybatis.type-aliases-package=com.zzg.hadoop.domain
# MyBatis mysql8 éç½®
spring.datasource.url=jdbc:mysql://192.168.**.**:3306/boot-security?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&nullCatalogMeansCurrent=true
spring.datasource.username=root
spring.datasource.password=******
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Druid éç½®
# åå§åæ¶å»ºç«ç©çè¿æ¥ç个æ°
spring.datasource.druid.initial-size=5
# æ大è¿æ¥æ± æ°é
spring.datasource.druid.max-active=30
# æå°è¿æ¥æ± æ°é
spring.datasource.druid.min-idle=5
# è·åè¿æ¥æ¶æ大ç­å¾æ¶é´ï¼åä½æ¯«ç§
spring.datasource.druid.max-wait=60000
# éç½®é´éå¤ä¹æè¿è¡ä¸æ¬¡æ£æµï¼æ£æµéè¦å³é­ç空é²è¿æ¥ï¼åä½æ¯æ¯«ç§
spring.datasource.druid.time-between-eviction-runs-millis=60000
# è¿æ¥ä¿æ空é²èä¸è¢«é©±éçæå°æ¶é´
spring.datasource.druid.min-evictable-idle-time-millis=300000
# ç¨æ¥æ£æµè¿æ¥æ¯å¦ææçsqlï¼è¦æ±æ¯ä¸ä¸ªæ¥è¯¢è¯­å¥
spring.datasource.druid.validation-query=SELECT 1 FROM DUAL
# 建议é置为trueï¼ä¸å½±åæ§è½ï¼å¹¶ä¸ä¿è¯å®å¨æ§ãç³è¯·è¿æ¥çæ¶åæ£æµï¼å¦æ空é²æ¶é´å¤§äºtimeBetweenEvictionRunsMillisï¼æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææã
spring.datasource.druid.test-while-idle=true
# ç³è¯·è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªéç½®ä¼éä½æ§è½ã
spring.datasource.druid.test-on-borrow=false
# å½è¿è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªéç½®ä¼éä½æ§è½ã
spring.datasource.druid.test-on-return=false
# æ¯å¦ç¼å­preparedStatementï¼ä¹å°±æ¯PSCacheãPSCache对æ¯æ游æ çæ°æ®åºæ§è½æå巨大ï¼æ¯å¦è¯´oracleãå¨mysqlä¸å»ºè®®å³é­ã
spring.datasource.druid.pool-prepared-statements=true
# è¦å¯ç¨PSCacheï¼å¿é¡»é置大äº0ï¼å½å¤§äº0æ¶ï¼poolPreparedStatementsèªå¨è§¦åä¿®æ¹ä¸ºtrueã
spring.datasource.druid.max-pool-prepared-statement-per-connection-size=50
# éç½®çæ§ç»è®¡æ¦æªçfiltersï¼å»æåçæ§çé¢sqlæ æ³ç»è®¡
#spring.datasource.druid.filters=stat,wall
# éè¿connectPropertieså±æ§æ¥æå¼mergeSqlåè½ï¼æ¢SQLè®°å½
spring.datasource.druid.connection-properties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
# å并å¤ä¸ªDruidDataSourceççæ§æ°æ®
spring.datasource.druid.use-global-data-source-stat=true
# éç½®sql 注å¥æ¹å¼
spring.datasource.druid.filters=stat
# æ¥å¿æ件éç½®
#logging.config=classpath:logback.xml

# hadoop hdfs éç½®åæ°
hdfs.hdfsPath=hdfs://192.168.60.204:9000
hdfs.hdfsName=root
#log4j.rootLogger=CONSOLE,info,error,DEBUG
log4j.rootLogger=info,error,CONSOLE,DEBUG

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender     
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout     
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n     
      
log4j.logger.info=info
log4j.appender.info=org.apache.log4j.DailyRollingFileAppender
log4j.appender.info.layout=org.apache.log4j.PatternLayout     
log4j.appender.info.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n  
log4j.appender.info.datePattern='.'yyyy-MM-dd
log4j.appender.info.Threshold = info   
log4j.appender.info.append=true   
#log4j.appender.info.File=/home/admin/pms-api-services/logs/info/api_services_info
log4j.appender.info.File=C:/logs/hadoop-hdfs/logs/info/api_services_info

log4j.logger.error=error  
log4j.appender.error=org.apache.log4j.DailyRollingFileAppender
log4j.appender.error.layout=org.apache.log4j.PatternLayout     
log4j.appender.error.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n  
log4j.appender.error.datePattern='.'yyyy-MM-dd
log4j.appender.error.Threshold = error   
log4j.appender.error.append=true   
#log4j.appender.error.File=/home/admin/pms-api-services/logs/error/api_services_error
log4j.appender.error.File=/C:/logs/hadoop-hdfs/logs/error/api_services_error

log4j.logger.DEBUG=DEBUG
log4j.appender.DEBUG=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DEBUG.layout=org.apache.log4j.PatternLayout     
log4j.appender.DEBUG.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n  
log4j.appender.DEBUG.datePattern='.'yyyy-MM-dd
log4j.appender.DEBUG.Threshold = DEBUG   
log4j.appender.DEBUG.append=true   
#log4j.appender.DEBUG.File=/home/admin/pms-api-services/logs/debug/api_services_debug
log4j.appender.DEBUG.File=C:/logs/hadoop-hdfs/logs/debug/api_services_debug

### Debug
log4j.logger.com.ibatis=DEBUG
log4j.logger.com.ibatis.common.jdbc.SimpleDataSource=DEBUG
log4j.logger.com.ibatis.common.jdbc.ScriptRunner=DEBUG
log4j.logger.com.ibatis.sqlmap.engine.impl.SqlMapClientDelegate=DEBUG
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG

程序入口代码:

package com.zzg.hadoop;


import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@SpringBootApplication 
@EnableTransactionManagement
@MapperScan("com.zzg.hadoop.mapper")
public class Application extends SpringBootServletInitializer {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		SpringApplication.run(Application.class, args);
		System.out.println("============= SpringBoot hadoop hdfs Service Start Success =============");
	}
	
	@Override
	protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
		// 注意这里要指向原先用main方法执行的Application启动类
		return builder.sources(Application.class);
	}

}

项目结构截图:

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/zhouzhiwengang/article/details/103224394
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-06-28 01:16:42
  • 阅读 ( 1934 )
  • 分类:研发管理

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢