Skip to Content

Persist Inbound Data for Later Processing

Use Case

A frequent use case with real-time/event-driven scenarios is what to do when the endpoint target is unavailable. An example is the scenario where a Salesforce outbound message is received by a Jitterbit API and interfaced to a database. If the database goes offline, the Jitterbit operation that connects to the database will throw an error and will activate the 'On Failure' operation path. If nothing is done to handle this kind of error, then there is a risk of data loss.

One way to handle this scenario is to persist the incoming payload until it is successfully processed. For example, if the database endpoint is offline, messages are persisted until the database is back online. The messages will be automatically processed through at the time the database is back online.

Note

This design pattern uses Design Studio as an example; you may apply the same concepts in Cloud Studio using similar steps.

Example

In the following example, the customer not only wanted message persistence, but also wanted to guarantee the integrity of the order changes. That is, if order number 1000 was followed by 2 changes to the same order, all the changes had to be processed in the correct order successfully. This is similar to the 'atomicity' principle for database transactions. In this example, creating an order creates a separate file for each order. Each change in processing or status of the order also creates a separate file.

Since the customer had more than one Private Agent, we also had to ensure that each agent was aware of processing performed by the other agents.

The solution was to use message locks held in files on a file system that was accessible by all agents. For example, Private Agent 100 is on Host A and Private Agent 200 is on Host B. The file system holding the lock files is on Host C. Both Host A and Host B must have access to the file system on Host C.

attachment

This is necessary so that each agent is aware of locks placed on orders by all other agents.

The method by which a Private Agent is uniquely identified is by editing the .conf file and assigning a global variable (see using Global Variables) to a fixed value. Project variables could not be used, since their scope is across all Private Agents.

attachment

In this case, the $servername is 100. The $servername for another Private Agent would be 200, and so on.

To start, assume that the incoming messages are written to a temporary file source called "Files". Also assume that there is a source called "Locks", which is a directory on the shared file system.

We begin by generating some test data:

/*
Will generate test order files (like 1000, 1001, 1002) and will generate additional orders (1001, 1001, 1001) randomly. Controlled by 'cnt'. so cnt = 10 will
generate 10 base orders plus some number of duplicates
*/

DeleteFiles("<TAG>Sources/Files</TAG>","*");
DeleteFiles("<TAG>Sources/Locks</TAG>","*");
cnt = 3; i = 0;seed=1000;
While(i<cnt,
 rcnt=Random(1,3);ri=0;
 While(ri<rcnt,
 WriteFile("<TAG>Targets/Files</TAG>","foobar",(seed)+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));
 sleep(1);
 ri+=1
 );
i+=1; seed +=1
);

FlushAllFiles("<TAG>Targets/Files</TAG>");

Note

Using FlushAllFiles() after WriteFile() is recommended as a best practice.

As noted in the comment, the purpose is to generate some incoming orders (3 in this case) with timestamps.

Order List:

1000.20160322152502

1000.20160322152503

1001.20160322152504

1001.20160322152505

1001.20160322152506

1002.20160322152507

1002.20160322152508

1002.20160322152509

We have 2 entries for 1000, 3 entries for 1001 and 3 entries for 1002. This emulates the scenario where order 1000 was received along with 1 change to that order, and so on for the other two orders.

attachment

There are two operations in this chain. The first operation is the order pre-processing and the second is a placeholder for the actual data update to the target, such as a database or web service update.

The script contained in the first operation is below. It checks the 'Files' source for orders, then checks the 'Locks' source for any order locks. If any locks are found, then it skips processing that order as well as any updates to that order. If there are no locks found, then a lock file is created and the operation that processes the order is called. If successful, both the file and the lock is deleted.

/*
1. Get list of order files with file naming convention of <filename>.<timestamp> where timestamp is yyyymmddhhmmss.
For example, 1001.20160220171811. This is done to aid in sorting the array.
2. Loop through the list of orders and check for locks. Locks have a file naming convention of <orderno>.<timestamp>.<agentnumber>
The agent number is set in the .conf file of the Private Agent. Assign a unique number to each Private Agent in the cluster.
The agent number will be useful to determine which agent set the lock. If the lock was set due to the agent failing during processing,
the user can elect to remove those locks as they were not due to the endpoint being down, or operation failure.
3. If a lock file is found for an order, then keep looping. If no lock is found for a file, the loop is stopped, go on to Step 2
4. If unlocked file found, lock file created.
5. Pass file name to synchronous operation and check operation status. If successful, then delete file and delete lock.
Optional: set up global 'datastatus' that will indicate pass/fail of insert into target and change
next statement to use it: If($datastatus || status ...
Configure the processing operation to have a short timeout (<5 min)
**/

unlock = true;
arr = FileList("<TAG>Sources/Files</TAG>","files","*");//reads list of files into array
  SortArray(arr);//sorts
WriteToOperationLog("List: "+arr);
cnt=Length(arr);i=0;//loops


// STEP 1: Find a file with no locks


While(i<cnt,

$filename=arr[i];//gets first file
 orderno=Split($filename,".");//gets order no
 WriteToOperationLog("File: "+$filename+" orderno: "+orderno[0]);
 locks = FileList("<TAG>Sources/Locks</TAG>","locks",orderno[0]+"*"); //checks of order no exists in list of locks
 nolock = If(Length(locks)>0,false,true); // if false then found lock file,skip, else can use this file
 WriteToOperationLog("Locks: "+locks+" nolock: "+nolock);
 If(nolock,
 i=cnt;

WriteToOperationLog("Found unlocked order, stop loop");

 //get file with this order no and sets incrementer to counter, stopping loop

);

 i++;

);


// STEP 2: If unlocked file found, process.


If(nolock,

 WriteToOperationLog("Selected File: "+$filename);
 lockfilename = orderno[0]+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS")+"."+$servername;
 WriteFile("<TAG>Targets/Locks</TAG>",$filename,lockfilename);// create lock file.
 FlushAllFiles("<TAG>Targets/Locks</TAG>");
 WriteToOperationLog("Lockfile: "+lockfilename);

 status = RunOperation("<TAG>Operations/3. Fake Op</TAG>");//pass filename to Operation that will process data. status is success/fail of the operation.

 // deletes order file if status is true, else file is not deleted. Also sets unlock to false, if the operation not successful, then lock is not deleted

If(status,

 DeleteFile("<TAG>Sources/Files</TAG>",$filename);
 WriteToOperationLog("delete file "+ $filename),
 unlock = false

 );

// unlockme = Split($filename,".");

// gets order number:

If(unlock, DeleteFiles("<TAG>Sources/Locks</TAG>",orderno[0]+"*");

 WriteToOperationLog("delete lock for order: "+orderno[0]+"*"));
 // deletes lock file after all the files for that order are processed,
 // but leaves lock if any of the RunOps are unsuccessful
)

There are a number of key functions that make this possible. Since the FileList() function outputs an array, we can use the SortArray() function to give us the earliest file based on the timestamp in the file name. We don't have a function that sorts a file by file date created. The WriteToOperationLog() function is generally for debugging, so its instances can be removed for production purposes.

The 'Fake Op' script uses the RaiseError() function to simulate a failure and generate a lock:

WriteToOperationLog(Now());
RaiseError("foo")

After one run, we can check the sources:

attachment

It shows a single lock on order 1000 placed by agent 100, and that all the orders are unprocessed.

If we comment out the RaiseError() so that it processes successfully and rerun 'Check Files to Process' ...

attachment

The lock on order #1000 remains, and the first 1001 order (1001.20160322152504) is processed and deleted.

Run the process two more times ...

attachment

... and all of the 1001 orders are processed, but the lock remains on order 1000 and none of the order 1000 files are processed.

But what if the issue with the endpoint is resolved? We need a way to remove the locks and try again. The approach is to "age" the locks and remove them, allowing a retry of the orders automatically. If the endpoint is still unavailable, then the lock is reset.

This script can be run periodically to clear out old locks. It reads the "Locks" source, compares the timestamp against the current time, and if older than 5 minutes, will delete the lock.

locks = FileList("<TAG>Sources/Locks</TAG>","locks","*");

SortArray(locks);
locks;
cnt = length(locks); i=0;
While(i<cnt,
order = Split(locks[i],".");
orderno = order[0];
now = (CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));

// set interval for 5 min, 300 sec;

If(Double(now)-Double(order[1])>$deletelocktime,del = true;WriteToOperationLog("order lock file to delete"+orderno);DeleteFiles("<TAG>Sources/Locks</TAG>",orderno+"*"),
 WriteToOperationLog("Locks: "+locks+" below time limit")
 );
i+=1);

Note that this uses a project variable, $deletelocktime, that holds the number of seconds to age the lock.

attachment