Saltar al contenido

Conservar los Datos Entrantes para un Procesamiento Posterior

Caso de Uso

Un caso de uso frecuente con escenarios en tiempo real/controlados por eventos es qué hacer cuando el destino del extremo no está disponible. Un ejemplo es el escenario en el que una API de Jitterbit recibe un mensaje saliente de Salesforce y lo conecta a una base de datos. Si la base de datos se desconecta, la operación Jitterbit que se conecta a la base de datos generará un error y activará la ruta de operación 'En caso de falla'. Si no se hace nada para manejar este tipo de error, existe el riesgo de pérdida de datos.

Una forma de manejar este escenario es conservar la carga útil entrante hasta que se procese correctamente. Por ejemplo, si el extremo de la base de datos está fuera de línea, los mensajes se mantienen hasta que la base de datos vuelve a estar en línea. Los mensajes se procesarán automáticamente cuando la base de datos vuelva a estar en línea.

Nota

Este patrón de diseño usa Design Studio como ejemplo; puede aplicar los mismos conceptos en Cloud Studio usando pasos similares.

Ejemplo

En el siguiente ejemplo, el cliente no solo quería la persistencia del mensaje, sino que también quería garantizar la integridad de los cambios en el pedido. Es decir, si al pedido número 1000 le siguieron 2 cambios en el mismo pedido, todos los cambios debían procesarse correctamente en el orden correcto. Esto es similar al principio de 'atomicidad' para transacciones de bases de datos. En este ejemplo, al crear un pedido se crea un archivo independiente para cada pedido. Cada cambio en el procesamiento o estado de la orden también crea un archivo separado.

Dado que el cliente tenía más de un Agente Privado, también teníamos que asegurarnos de que cada agente estuviera al tanto del procesamiento realizado por los otros agentes.

La solución fue usar bloqueos de mensajes guardados en archivos en un sistema de archivos al que podían acceder todos los agentes. Por ejemplo, el Agente Privado 100 está en el Host A y el Agente Privado 200 está en el Host B. El sistema de archivos que contiene los archivos de bloqueo está en el Host C. Tanto el Host A como el Host B deben tener acceso al sistema de archivos en el Host C.

adjunto

Esto es necesario para que cada agente esté al tanto de los bloqueos colocados en los pedidos por todos los demás agentes.

El método por el cual un Agente Privado se identifica de forma única es mediante la edición del archivo .conf y la asignación de una variable global (consulte el uso de Variables globales) a un valor fijo. No se pudieron utilizar variables de proyecto, ya que su alcance es a través de todos los Agentes Privados.

adjunto

En este caso, el $servername es 100. El $servername para otro Agente Privado sería 200, etcétera.

Para comenzar, suponga que los mensajes entrantes se escriben en una fuente de archivo temporal llamada "Archivos". Suponga también que hay una fuente llamada "Bloqueos", que es un directorio en el sistema de archivos compartidos.

Comenzamos generando algunos datos de prueba:

/*
Will generate test order files (like 1000, 1001, 1002) and will generate additional orders (1001, 1001, 1001) randomly. Controlled by 'cnt'. so cnt = 10 will
generate 10 base orders plus some number of duplicates
*/

DeleteFiles("<TAG>Sources/Files</TAG>","*");
DeleteFiles("<TAG>Sources/Locks</TAG>","*");
cnt = 3; i = 0;seed=1000;
While(i<cnt,
 rcnt=Random(1,3);ri=0;
 While(ri<rcnt,
 WriteFile("<TAG>Targets/Files</TAG>","foobar",(seed)+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));
 sleep(1);
 ri+=1
 );
i+=1; seed +=1
);

FlushAllFiles("<TAG>Targets/Files</TAG>");

Nota

Usando FlushAllFiles() después WriteFile() se recomienda como mejor práctica.

Como se indica en el comentario, el propósito es generar algunos pedidos entrantes (3 en este caso) con marcas de tiempo.

Lista de orden:

1000.20160322152502

1000.20160322152503

1001.20160322152504

1001.20160322152505

1001.20160322152506

1002.20160322152507

1002.20160322152508

1002.20160322152509

Tenemos 2 entradas para 1000, 3 entradas para 1001 y 3 entradas para 1002. Esto emula el escenario donde se recibió el pedido 1000 junto con 1 cambio a ese pedido, y así sucesivamente para los otros dos pedidos.

adjunto

Hay dos operaciones en esta cadena. La primera operación es el preprocesamiento del pedido y la segunda es un marcador de posición para la actualización de datos reales en el destino, como una base de datos o una actualización del servicio web.

El secuencia de comandos contenido en la primera operación se encuentra a continuación. Verifica la fuente de 'Archivos' para pedidos, luego verifica la fuente de 'Bloqueos' para cualquier bloqueo de pedido. Si se encuentran bloqueos, omite el procesamiento de ese pedido, así como cualquier actualización de ese pedido. Si no se encuentran bloqueos, se crea un archivo de bloqueo y se llama a la operación que procesa el pedido. Si tiene éxito, se eliminan tanto el archivo como el bloqueo.

/*
1. Get list of order files with file naming convention of <filename>.<timestamp> where timestamp is yyyymmddhhmmss.
For example, 1001.20160220171811. This is done to aid in sorting the array.
2. Loop through the list of orders and check for locks. Locks have a file naming convention of <orderno>.<timestamp>.<agentnumber>
The agent number is set in the .conf file of the Private Agent. Assign a unique number to each Private Agent in the cluster.
The agent number will be useful to determine which agent set the lock. If the lock was set due to the agent failing during processing,
the user can elect to remove those locks as they were not due to the endpoint being down, or operation failure.
3. If a lock file is found for an order, then keep looping. If no lock is found for a file, the loop is stopped, go on to Step 2
4. If unlocked file found, lock file created.
5. Pass file name to synchronous operation and check operation status. If successful, then delete file and delete lock.
Optional: set up global 'datastatus' that will indicate pass/fail of insert into target and change
next statement to use it: If($datastatus || status ...
Configure the processing operation to have a short timeout (<5 min)
**/

unlock = true;
arr = FileList("<TAG>Sources/Files</TAG>","files","*");//reads list of files into array
  SortArray(arr);//sorts
WriteToOperationLog("List: "+arr);
cnt=Length(arr);i=0;//loops


// STEP 1: Find a file with no locks


While(i<cnt,

$filename=arr[i];//gets first file
 orderno=Split($filename,".");//gets order no
 WriteToOperationLog("File: "+$filename+" orderno: "+orderno[0]);
 locks = FileList("<TAG>Sources/Locks</TAG>","locks",orderno[0]+"*"); //checks of order no exists in list of locks
 nolock = If(Length(locks)>0,false,true); // if false then found lock file,skip, else can use this file
 WriteToOperationLog("Locks: "+locks+" nolock: "+nolock);
 If(nolock,
 i=cnt;

WriteToOperationLog("Found unlocked order, stop loop");

 //get file with this order no and sets incrementer to counter, stopping loop

);

 i++;

);


// STEP 2: If unlocked file found, process.


If(nolock,

 WriteToOperationLog("Selected File: "+$filename);
 lockfilename = orderno[0]+"."+CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS")+"."+$servername;
 WriteFile("<TAG>Targets/Locks</TAG>",$filename,lockfilename);// create lock file.
 FlushAllFiles("<TAG>Targets/Locks</TAG>");
 WriteToOperationLog("Lockfile: "+lockfilename);

 status = RunOperation("<TAG>Operations/3. Fake Op</TAG>");//pass filename to Operation that will process data. status is success/fail of the operation.

 // deletes order file if status is true, else file is not deleted. Also sets unlock to false, if the operation not successful, then lock is not deleted

If(status,

 DeleteFile("<TAG>Sources/Files</TAG>",$filename);
 WriteToOperationLog("delete file "+ $filename),
 unlock = false

 );

// unlockme = Split($filename,".");

// gets order number:

If(unlock, DeleteFiles("<TAG>Sources/Locks</TAG>",orderno[0]+"*");

 WriteToOperationLog("delete lock for order: "+orderno[0]+"*"));
 // deletes lock file after all the files for that order are processed,
 // but leaves lock if any of the RunOps are unsuccessful
)

Hay una serie de funciones clave que lo hacen posible. Desde el FileList() la función genera una matriz, podemos usar la función SortArray() función para darnos el archivo más antiguo basado en la marca de tiempo en el nombre del archivo. No tenemos una función que clasifique un archivo por fecha de creación. El WriteToOperationLog() la función es generalmente para la depuración, por lo que sus instancias se pueden eliminar con fines de producción.

El secuencia de comandos 'Fake Op' usa el RaiseError() función para simular un fallo y generar un bloqueo:

WriteToOperationLog(Now());
RaiseError("foo")

Después de una ejecución, podemos verificar las fuentes:

adjunto

Muestra un solo bloqueo en el pedido 1000 realizado por el agente 100 y que todos los pedidos están sin procesar.

Si comentamos el RaiseError() para que procese correctamente y vuelva a ejecutar 'Comprobar archivos para procesar'...

adjunto

El bloqueo en el pedido #1000 permanece y el primer pedido 1001 (1001.20160322152504) se procesa y elimina.

Ejecute el proceso dos veces más...

adjunto

... y se procesan todos los pedidos 1001, pero el bloqueo permanece en el pedido 1000 y no se procesa ninguno de los archivos del pedido 1000.

Pero, ¿y si se resuelve el problema con el extremo ? Necesitamos una forma de quitar los candados e intentarlo de nuevo. El enfoque es "envejecer" los bloqueos y eliminarlos, lo que permite volver a intentar las órdenes automáticamente. Si el extremo sigue sin estar disponible, se restablece el bloqueo.

Este secuencia de comandos se puede ejecutar periódicamente para borrar bloqueos antiguos. Lee la fuente de "Bloqueos", compara la marca de tiempo con la hora actual y, si tiene más de 5 minutos, eliminará el bloqueo.

locks = FileList("<TAG>Sources/Locks</TAG>","locks","*");

SortArray(locks);
locks;
cnt = length(locks); i=0;
While(i<cnt,
order = Split(locks[i],".");
orderno = order[0];
now = (CVTDate(Now(),"yyyy-mm-dd HH:MM:SS","yyyymmddHHMMSS"));

// set interval for 5 min, 300 sec;

If(Double(now)-Double(order[1])>$deletelocktime,del = true;WriteToOperationLog("order lock file to delete"+orderno);DeleteFiles("<TAG>Sources/Locks</TAG>",orderno+"*"),
 WriteToOperationLog("Locks: "+locks+" below time limit")
 );
i+=1);

Tenga en cuenta que esto utiliza una variable de proyecto, $deletelocktime, que contiene el número de segundos para envejecer la cerradura.

adjunto