Pages

Tuesday, January 3, 2017

Multithreaded access to Documentum with DFC

Execution of some tasks, e.g. the tasks involving communication via tcp, might be faster when multiple tasks are executed simultaneously. Suppose an application is to make 250 calls to some web server supporting concurrent users. Making 250 calls at the same time will take less time than making all the calls one after the other. The example below mimics such an application and compares the time required to download yahoo.com web page 250 times in a row to the time used to dowload the webpage as many times but using 10 threads.

MyConcurrency class includes the main method. It performs 250 sequential calls followed by 250 parallel calls and then calculates how much faster is the concurrent execution. Those two steps are repeated 10 times to get the reliable average.

public class MyConcurrency {
    int callNumber;
    int numberOfThreads;

    public MyConcurrency(int callNumber, int numberOfThreads) {
        this.numberOfThreads = numberOfThreads;
        this.callNumber = callNumber;
    }

    public static void main(String... args) throws DfException, InterruptedException, ExecutionException {
        int callNumber = 250; 
        int numberOfThreads = 10; // number of threads to invoke the number of task defined in the row above.
        for (int c = 0; c < 5; c++) {
            MyConcurrency i = new MyConcurrency(callNumber, numberOfThreads);
            i.executeBothSequentialAndConcurrentCalls(new MyHTTPRequest());       
        }
    }

    public double executeBothSequentialAndConcurrentCalls(MyRequest req) throws DfException, InterruptedException, ExecutionException {

        // measure how much time it takes to execute calls sequentially 
        long begin = System.currentTimeMillis();
        sequential(req);
        long end = System.currentTimeMillis();
        double sequentialDuration = end - begin;

        // measure how much time it takes to make calls simultaneously 
        begin = System.currentTimeMillis();
        concurrent(req);
        end = System.currentTimeMillis();
        double concurrentDuration = end - begin;
        double ratio = sequentialDuration / concurrentDuration;
        System.out.println("ratio total duration sequential/concurrent=" + ratio);
        return ratio;
    }

    // executes calls one by one
    List<Long> sequential(MyRequest req) throws DfException, InterruptedException {
        List<Long> durations = new ArrayList<>(); 
        for (int i = 0; i < callNumber; i++) {
            durations.add(req.request("S", i));
        }
        return times; // individual durations are not discussed here to simplify the text
    }

    // tries to execute all call simultaneously
    List<Long> concurrent(final MyRequest req) throws InterruptedException, ExecutionException {
        final List<Callable<Long>> partitions = new ArrayList<>();
        for (int i = 0; i < callNumber; i++) {
            final int j = i;
            partitions.add(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    return req.request("C", j);
                }
            });
        }

        ExecutorService executorPool = Executors.newFixedThreadPool(numberOfThreads);
        List<Future<Long>> results = executorPool.invokeAll(partitions);
        // a list storing execution time of each single task, in parallel execution some threads have to wait long time for available resources
        List<Long> durations = new ArrayList<>();
        for (Future<Long> r : results) {
            durations.add(r.get());
        }
        executorPool.shutdown();
        return durations; // individual durations are not discussed here to simplify the text
    }
}

Abstract class MyRequest will serve as the interface for subclasses implementing calls to websites or documentum.

public abstract class MyRequest {

    // execute a call to documentum or website, return the spent time
    long request(String objectNamePrefix, Integer objectNumber ) throws DfException, InterruptedException {
        long start = System.currentTimeMillis();

        execute(objectNamePrefix, objectNumber );
        long end = System.currentTimeMillis();

        return end - start;
    }

    abstract void execute(String objectNamePrefix, Integer objectNumber ) throws DfException, InterruptedException;
}

The subclass downloading yahoo website.

public class MyHTTPRequest extends MyRequest {

    @Override
    void execute(String objectNamePrefix, Integer objectNumber) throws DfException, InterruptedException {
        // download a webpage to see how concurrency should work
        downloadWebsite();
    }
    
    // do something that can be executed faster concurrently 
    void downloadWebsite() {
        try {
            final URL url = new URL("http://www.yahoo.com");
            BufferedReader reader = null;
            String l;

            reader = new BufferedReader(new InputStreamReader(url.openConnection().getInputStream()));
            while ((l = reader.readLine()) != null) {
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

As expected, the output of the example above demonstrates that downloading yahoo 250 times is on average 8 times faster when executed in 10 concurrent threads.

The common thing in http requests and calls from dfc to Documentum is that the communication occurs via tcp. So let's adapt the example above to evaluate whether multithreading and parallelization could speed up an application interacting with a Documentum repository. To compare serial to parallel calls to Documentum, the main method has to be further elaborated and some auxiliary methods have to be added:

public static void main(String... args) throws DfException, InterruptedException, ExecutionException {
        int callNumber = 250;
        int numberOfThreads = 10;
        for (int c = 0; c < 5; c++) {
            System.out.println(">>>Http Control ");
            MyConcurrency i = new MyConcurrency(callNumber, numberOfThreads);
            i.executeBothSequentialAndConcurrentCalls(new MyHTTPRequest());

            System.out.println(">>> Documentum operations using the same session");
            String targetFolderId = createNewFolder(); // all objects will be linked to this folder
            IDfSession commonSession = getNewSession();
            i.executeBothSequentialAndConcurrentCalls(new MyDocumentumRequestUsingOneSession(targetFolderId, commonSession));
            commonSession.disconnect();

            System.out.println(">>> Documentum operations using sessions from pool");
            int numberOfSessionsInPool = numberOfThreads;
            BlockingQueue<IDfSession> sessionPool = createSessionPool(numberOfSessionsInPool);
            i.executeBothSequentialAndConcurrentCalls(new MyDocumentumRequestUsingSessionsFromPool(targetFolderId, sessionPool));
            releaseSessionsInPool(sessionPool);

            System.out.println(">>> Documentum operations using a new session for each call");
            i.executeBothSequentialAndConcurrentCalls(new MyDocumentumRequestUsingNewSessionEachTime(targetFolderId));
        }
    }

    // creates a folder to contain created documentum objects
    static String createNewFolder() throws DfException {
        IDfSession session = getNewSession();
        IDfFolder folder = (IDfFolder) session.newObject("dm_folder");
        folder.setObjectName("Test" + System.currentTimeMillis());
        folder.link(baseFolderPath);
        folder.save();
        String targetFolderId = folder.getObjectId().getId();
        session.disconnect();
        return targetFolderId;
    }

    static BlockingQueue<IDfSession> createSessionPool(int numberOfSessionsInPool) throws DfException, InterruptedException {
        BlockingQueue<IDfSession> sessionPool = new LinkedBlockingQueue<>();
        for (int i = 0; i < numberOfSessionsInPool; i++) {
            sessionPool.put(getNewSession());
        }
        return sessionPool;
    }

    static void releaseSessionsInPool(BlockingQueue<IDfSession> sessionPool) throws DfException {
        try {
            while (true) {
                IDfSession session = sessionPool.remove();
                session.disconnect();
            }
        } catch (NoSuchElementException ex) {
            // all sessions released
        }
    }

    static IDfSession getNewSession() throws DfException {
        return connection.ConnectionFactory.getSession();
    }

    static String baseFolderPath = "/Formation";

Now, serial and concurrent calls to a Documentum repository will be compared to each other when documentum session are obtained through 3 different means:

  • the same one session will be used for all the calls
  • each call will wait for an available free session in a pool of premade sessions (the size of the pool equals the number of threads)
  • each call will create a new personal session

The first option is implemented in the class below:

public class MyDocumentumRequestUsingOneSession extends MyRequest {

    String targetFolderId;
    IDfSession commonSession;

    MyDocumentumRequestUsingOneSession(String targetFolderId, IDfSession commonSession) {
        this.targetFolderId = targetFolderId;
        this.commonSession = commonSession;
    }

    @Override
    void execute(String objectNamePrefix, Integer objectNumber ) throws DfException, InterruptedException {
        accessDocumentum(objectNamePrefix, objectNumber, commonSession);
    }

    // do anything in a docbase
    void accessDocumentum(String objectNamePrefix, int objectNumber, IDfSession session) throws DfException {
        String objectName =objectNamePrefix + System.currentTimeMillis() + "; object=" + objectNumber + "; thread=" + Thread.currentThread().getName() + "; session=" + session.getSessionId();
        IDfSysObject targetObject = (IDfSysObject) session.newObject("dm_document");
        targetObject.setObjectName(objectName);
        targetObject.link(targetFolderId);
        targetObject.save();
        targetObject.fetch(null);
        targetObject.setTitle("current time " + new Date().getTime());
        targetObject.save();
    }
}

The second case is implemented by the class below:

public class MyDocumentumRequestUsingSessionsFromPool extends MyDocumentumRequestUsingOneSession {

    BlockingQueue<IDfSession> sessionPool;

    public MyDocumentumRequestUsingSessionsFromPool( String targetFolderId, BlockingQueue<IDfSession> sessionPool) {
        super(targetFolderId, null);
        this.sessionPool = sessionPool;
    }

    @Override
    void execute(String objectNamePrefix, Integer objectNumber) throws DfException, InterruptedException {
        IDfSession sessionFromPool = getNextPooledSession();
        // do something timeconsuming in the docbase
        accessDocumentum(objectNamePrefix, objectNumber, sessionFromPool);
        returnSessionIntoPool(sessionFromPool);

    }

    IDfSession getNextPooledSession() throws InterruptedException {
        return sessionPool.take();
    }

    void returnSessionIntoPool(IDfSession session) throws InterruptedException {
        sessionPool.put(session);
    }
}

And the creation of new session for every call to documentum is implemented as follows:

public class MyDocumentumRequestUsingNewSessionEachTime extends MyDocumentumRequestUsingOneSession {

    public MyDocumentumRequestUsingNewSessionEachTime(String targetFolderId) {
        super(targetFolderId, null);
    }

    @Override
    void execute(String objectNamePrefix, Integer objectNumber) throws DfException, InterruptedException {
        IDfSession newSession = getNewSession();
        // do something timeconsuming in the docbase
        accessDocumentum(objectNamePrefix, objectNumber, newSession);
        newSession.disconnect();
    }

    IDfSession getNewSession() throws DfException {
        return connection.ConnectionFactory.getSession();
    }
}

Results from 10 consecutive executions of the three listed above options:

I tested the application with two different Documentum servers. The absolute times and the ratios varied but the pattern was the same. The results above were obtained with Documentum developer edition 7.2 image, which has 2 processors and few memory. With a production-quality server the maximal acceleration that the application demonstrated was over 3-fold.

Use of the same session does not give any performance advantage for the concurrent calls. It is very much expected because all DFC methods interacting with documentum are synchronized (except in subclasses of IDfOperation, if you try to execute concurrently for example IDfCopyOperation at best you will receive some error and usually the docbase will strangely stop without leaving any errors in log). So even if you have many threads sharing the same session they will mostly wait until the session class becomes unlocked.

In contrast, concurrent use of the sessions from a pool or creating new sessions every time both slightly speed up the execution. Clearly, the fastest execution can be achieved by using multiple pooled premaid sessions.

No comments:

Post a Comment