问题原因
Seatunnel的connector-file下的ftp包中关于读取文件的路径没有对路径进行编码导致当有中文的时候会出现乱码
修改源码
我们需要修改两处位置 一处是判断路径是否存在的方法
一处是读取文件的流的地方
修改判断文件是否存在的地方
这个文件的路径是org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java
private FileStatus getFileStatus(FTPClient client, Path file) throws IOException {FileStatus fileStat = null;Path workDir = new Path(client.printWorkingDirectory());Path absolute = makeAbsolute(workDir, file);Path parentPath = absolute.getParent();if (parentPath == null) { // root dirlong length = -1; // Length of root dir on server not knownboolean isDir = true;int blockReplication = 1;long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.long modTime = -1; // Modification time of root dir not known.Path root = new Path("/");return new FileStatus(length, isDir, blockReplication, blockSize, modTime, root.makeQualified(this));}String pathName = parentPath.toUri().getPath();FTPFile[] ftpFiles = client.listFiles(pathName);//TODO 修改的地方//regionif (ftpFiles == null) {client.setControlEncoding("UTF-8");client.sendCommand("OPTS UTF8", "ON");client.enterLocalActiveMode();//进行编码ftpFiles = client.listFiles(new String(pathName.getBytes("UTF-8"),"ISO-8859-1"));}//endregionif (ftpFiles != null) {for (FTPFile ftpFile : ftpFiles) {if (ftpFile.getName().equals(file.getName())) { // file found in dirfileStat = getFileStatus(ftpFile, parentPath);break;}}if (fileStat == null) {throw new FileNotFoundException("File " + file + " does not exist.");}} else {throw new FileNotFoundException("File " + file + " does not exist.");}return fileStat;}
修改获取文件流的地方
这个open方法也存在同样的问题需要修改
修改后:
@Overridepublic FSDataInputStream open(Path file, int bufferSize) throws IOException {FTPClient client = connect();Path workDir = new Path(client.printWorkingDirectory());Path absolute = makeAbsolute(workDir, file);FileStatus fileStat = getFileStatus(client, absolute);if (fileStat.isDirectory()) {disconnect(client);throw new FileNotFoundException("Path " + file + " is a directory.");}client.allocate(bufferSize);Path parent = absolute.getParent();// Change to parent directory on the// server. Only then can we read the// file// on the server by opening up an InputStream. As a side effect the working// directory on the server is changed to the parent directory of the file.// The FTP client connection is closed when close() is called on the// FSDataInputStream.boolean haschangeDirecory = client.changeWorkingDirectory(parent.toUri().getPath());//region//TODO 修改的地方//判断是否成功切换到工作路径下 如果没有切换成功可能是因为存在中文路径导致的if (!haschangeDirecory) {String newpath = new String(parent.toUri().getPath().getBytes("UTF-8"), "ISO-8859-1");client.changeWorkingDirectory(newpath);}InputStream is = client.retrieveFileStream(file.getName());if (null==is){String newfilename = new String(file.getName().getBytes("UTF-8"), "ISO-8859-1");is = client.retrieveFileStream(newfilename);}//endregionFSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, client, statistics));if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {// The ftpClient is an inconsistent state. Must close the stream// which in turn will logout and disconnect from FTP serverfis.close();throw new IOException("Unable to open file: " + file + ", Aborting");}return fis;}
建议config里面也修改一下
修改完成之后打个包放到Linux下connector的那个文件夹的位置下就可以了