Sunday, 24 September 2017

FTP files into Apache Cassandra with Apache FtpServer

Apache FtpServer provides an API to allow you to implement your own file system to back file uploads and downloads. Using the native file system as a guide, this project builds on a previous blog post - Another Apache Cassandra File System, which implements a chunked file system with persistence provided by Apache Cassandra to read/write files directly to/from the database.

Make sure you clone and build the file system project and stand up the Cassandra database from this git repository, before using the code in this blog post - it's a required dependency.

To create an alternative file system, you need to implement three interfaces from the FtpServer ftplet-api: FileSystemFactory, FileSystemView and FtpFile.

CassandraFileSystemFactory.java

package org.adrianwalker.ftpserver.filesystem;

import org.adrianwalker.cassandra.filesystem.controller.FileSystemController;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CassandraFileSystemFactory implements FileSystemFactory {

  private static final Logger LOGGER = LoggerFactory.getLogger(CassandraFileSystemFactory.class);

  private final FileSystemController controller;

  public CassandraFileSystemFactory(final FileSystemController controller) {

    LOGGER.debug("controller = {}", controller);

    if (null == controller) {
      throw new IllegalArgumentException("controller is null");
    }

    this.controller = controller;
  }

  @Override
  public FileSystemView createFileSystemView(final User user) throws FtpException {

    LOGGER.debug("user = {}", user);

    if (null == user) {
      throw new IllegalArgumentException("user is null");
    }

    return new CassandraFileSystemView(user, controller);
  }
}

CassandraFileSystemView.java

package org.adrianwalker.ftpserver.filesystem;

import static java.io.File.separator;

import org.adrianwalker.cassandra.filesystem.controller.FileSystemController;
import org.adrianwalker.cassandra.filesystem.entity.File;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;

public final class CassandraFileSystemView implements FileSystemView {

  private static final Logger LOGGER = LoggerFactory.getLogger(CassandraFileSystemView.class);

  private final User user;
  private final FileSystemController controller;

  private final String homeDirectory;
  private String workingDirectory;

  public CassandraFileSystemView(final User user, final FileSystemController controller) {

    LOGGER.debug("user = {}, controller = {}", user, controller);

    if (null == user) {
      throw new IllegalArgumentException("user is null");
    }

    if (null == controller) {
      throw new IllegalArgumentException("controller is null");
    }

    this.user = user;
    this.controller = controller;

    this.homeDirectory = user.getHomeDirectory();
    this.workingDirectory = homeDirectory;
  }

  @Override
  public FtpFile getHomeDirectory() throws FtpException {

    LOGGER.debug("homeDirectory = {}", homeDirectory);

    FtpFile file = getFile(homeDirectory);

    if (!file.doesExist()) {
      file = createDirectory(homeDirectory);
    }

    return file;
  }

  @Override
  public FtpFile getWorkingDirectory() throws FtpException {

    LOGGER.debug("workingDirectory = {}", workingDirectory);

    FtpFile file = getFile(workingDirectory);

    if (!file.doesExist()) {
      file = createDirectory(workingDirectory);
    }

    return file;
  }

  @Override
  public boolean changeWorkingDirectory(final String workingDirectory) throws FtpException {

    LOGGER.debug("workingDirectory = {}", workingDirectory);

    FtpFile file = getFile(workingDirectory);
    boolean exists = file.doesExist();

    if (exists) {
      this.workingDirectory = file.getAbsolutePath();
    }

    return exists;
  }

  @Override
  public FtpFile getFile(final String name) throws FtpException {

    LOGGER.debug("name = {}", name);

    if (null == name) {
      throw new IllegalArgumentException("name is null");
    }

    String path = normalize(name);
    File file = controller.getFile(path);

    return new CassandraFtpFile(user, path, file, controller);
  }

  @Override
  public boolean isRandomAccessible() throws FtpException {

    return false;
  }

  @Override
  public void dispose() {
  }

  private String normalize(final String name) {

    LOGGER.debug("name = {}", name);

    Path path;
    if (name.startsWith(separator)) {
      path = Paths.get(name);
    } else {
      path = Paths.get(workingDirectory, name);
    }

    String normalizedName = path
            .normalize()
            .toString();

    LOGGER.debug("normalizedName = {}", normalizedName);

    return normalizedName;
  }

  private FtpFile createDirectory(final String path) {

    LOGGER.debug("path = {}", path);

    File directory = new File();
    directory.setName(Paths.get(path).getFileName().toString());
    directory.setDirectory(true);
    directory.setOwner(user.getName());
    directory.setGroup(user.getName());
    directory.setModified(System.currentTimeMillis());

    controller.saveFile(path, directory);

    return new CassandraFtpFile(user, path, directory, controller);
  }
}

CassandraFtpFile.java

package org.adrianwalker.ftpserver.filesystem;

import static java.util.stream.Collectors.toList;

import org.adrianwalker.cassandra.filesystem.controller.FileSystemController;
import org.adrianwalker.cassandra.filesystem.entity.File;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.List;

public final class CassandraFtpFile implements FtpFile {

  private static final Logger LOGGER = LoggerFactory.getLogger(CassandraFtpFile.class);

  private final User user;
  private final String path;
  private File file;
  private final FileSystemController controller;

  public CassandraFtpFile(
          final User user,
          final String path,
          final File file,
          final FileSystemController controller) {

    LOGGER.debug("user = {}, path = {}, file = {}, controller = {}", user, path, file, controller);

    if (null == user) {
      throw new IllegalArgumentException("user is null");
    }

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    if (null == controller) {
      throw new IllegalArgumentException("controller is null");
    }

    this.user = user;
    this.path = path;
    this.file = file;
    this.controller = controller;
  }

  @Override
  public String getAbsolutePath() {

    LOGGER.debug("path = {}", path);

    return path;
  }

  @Override
  public String getName() {

    String name = file.getName();
    LOGGER.debug("name = {}", name);

    return name;
  }

  @Override
  public boolean isHidden() {

    boolean hidden = file.isHidden();
    LOGGER.debug("hidden = {}", hidden);

    return hidden;
  }

  @Override
  public boolean isDirectory() {

    boolean directory = file.isDirectory();
    LOGGER.debug("directory = {}", directory);

    return directory;
  }

  @Override
  public boolean isFile() {

    boolean file = !isDirectory();
    LOGGER.debug("file = {}", file);

    return file;
  }

  @Override
  public boolean doesExist() {

    boolean exists = file != null;
    LOGGER.debug("exists = {}", exists);

    return exists;
  }

  @Override
  public boolean isReadable() {

    boolean readable = doesExist();
    LOGGER.debug("readable = {}", readable);

    return readable;
  }

  @Override
  public boolean isWritable() {

    boolean writable = path.startsWith(user.getHomeDirectory());
    LOGGER.debug("writable = {}", writable);

    return writable;
  }

  @Override
  public boolean isRemovable() {

    boolean removable = doesExist() && isWritable();
    LOGGER.debug("removable = {}", removable);

    return removable;
  }

  @Override
  public String getOwnerName() {

    String owner = file.getOwner();
    LOGGER.debug("owner = {}", owner);

    return owner;
  }

  @Override
  public String getGroupName() {

    String group = file.getGroup();
    LOGGER.debug("group = {}", group);

    return group;
  }

  @Override
  public int getLinkCount() {

    int linkCount = file.isDirectory() ? 2 : 1;
    LOGGER.debug("linkCount = {}", linkCount);

    return linkCount;
  }

  @Override
  public long getLastModified() {

    long lastModified = file.getModified();
    LOGGER.debug("lastModified = {}", lastModified);

    return lastModified;
  }

  @Override
  public boolean setLastModified(final long lastModified) {

    LOGGER.debug("lastModified = {}", lastModified);
    file.setModified(lastModified);

    return controller.saveFile(path, file);
  }

  @Override
  public long getSize() {

    long size = file.getSize();
    LOGGER.debug("size = {}", size);

    return size;
  }

  @Override
  public Object getPhysicalFile() {

    LOGGER.debug("file = {}", file);

    return file;
  }

  @Override
  public boolean mkdir() {

    LOGGER.debug("path = {}", path);

    File directory = new File();
    directory.setName(Paths.get(path).getFileName().toString());
    directory.setDirectory(true);
    directory.setOwner(user.getName());
    directory.setGroup(user.getName());

    return controller.saveFile(path, directory);
  }

  @Override
  public boolean delete() {

    LOGGER.debug("path = {}", path);

    return controller.deleteFile(path);
  }

  @Override
  public boolean move(final FtpFile ftpFile) {

    LOGGER.debug("ftpFile = {}", ftpFile);

    if (null == ftpFile) {
      throw new IllegalArgumentException("ftpFile is null");
    }

    return controller.moveFile(path, ftpFile.getAbsolutePath());
  }

  @Override
  public List<CassandraFtpFile> listFiles() {

    LOGGER.debug("path = {}", path);

    return controller.listFiles(path)
            .stream().map(file -> new CassandraFtpFile(
            user, Paths.get(path, file.getName()).toString(), file, controller))
            .collect(toList());
  }

  @Override
  public OutputStream createOutputStream(final long offset) throws IOException {

    LOGGER.debug("offset = {}", offset);

    if (offset != 0) {
      throw new IllegalArgumentException("zero offset unsupported");
    }

    if (null == file) {
      file = new File();
      file.setName(Paths.get(path).getFileName().toString());
      file.setDirectory(false);
      file.setOwner(user.getName());
      file.setGroup(user.getName());
      file.setModified(System.currentTimeMillis());

      controller.saveFile(path, file);
    }

    return new BufferedOutputStream(controller.createOutputStream(file));
  }

  @Override
  public InputStream createInputStream(final long offset) throws IOException {

    LOGGER.debug("offset = {}", offset);

    if (offset != 0) {
      throw new IllegalArgumentException("zero offset unsupported");
    }

    return new BufferedInputStream(controller.createInputStream(file));
  }
}

Example usage when used with an embedded FTP server:

private void exampleUsage() throws FtpException {

  ListenerFactory listenerFactory = new ListenerFactory();
  listenerFactory.setPort(8021);

  FtpServerFactory serverFactory = new FtpServerFactory();
  serverFactory.addListener("default", listenerFactory.createListener());

  Cluster cluster = new Cluster.Builder()
          .addContactPoints("127.0.0.1")
          .withPort(9042)
          .build();
  Session session = cluster.connect("filesystem");
  FileSystemController controller = new FileSystemController(session);

  serverFactory.setFileSystem(new CassandraFileSystemFactory(controller));

  PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
  userManagerFactory.setFile(new File("users.properties"));
  serverFactory.setUserManager(userManagerFactory.createUserManager());

  FtpServer server = serverFactory.createServer();
  server.start();
}

With a users properties file, where the test username is testuser, and the MD5 encoded password is password.

users.properties

ftpserver.user.testuser.homedirectory=/testuser
ftpserver.user.testuser.userpassword=5f4dcc3b5aa765d61d8327deb882cf99
ftpserver.user.testuser.maxloginnumber=3
ftpserver.user.testuser.writepermission=true

Source Code

Build and Test

The project is a standard Maven project which can be built with:

mvn clean install