Création d’un framework d’ETL en Node.js

Contexte

Les effets indésirables des médicaments constituent une des première cause de mortalité dans le monde(1) et les interactions entre ces médicaments représentent environ 40% de ces effets (2). Il est donc compréhensible que ce domaine de recherche soit très prolifique, et c’est dans ce contexte que le projet de notre client prend place.

En effet, le but était d’arriver à constituer une base de données exploitable par l’équipe de recherche en question, c’est à dire leur permettant d’avoir des données « propres » et dans un format dont elle pourrait se servir pour faire de l’analyse de données (notamment des calculs statistiques) afin d’en tirer de nouvelles conclusions visant à informer les décisions de prescription de médicaments.

L’une des principales sources de données à disposition pour ce sujet est la base des Drug Adverse Events de la Food And Drug Admnistration (FDA) américaine qui a mis en place le plus important système de reporting d’évènements non désirés au monde concernant les produits aussi bien cosmétique, que les traitements anti-cancéreux. Cette base de données repertorie tous les cas depuis 2002 et est accessible sous différentes formes : API, téléchargement de fichier json ou csv… Elle est mise à jour tous les trimestres et représente un volume d’environ 15Go (dépend du format) à la mi 2021.

Cependant, La qualité de la donnée varie grandement entre les cas rapportés. En effet, d’une part, le système de reporting a évolué au cours des années, et d’autre part, les données sont entrées manuellement par les praticiens. En analysant les données, on se rend compte notamment que certains champs ne sont pas codifiés mais en langage naturel, comme par exemple les noms de produits. Cela implique donc de nombreux traitements et croisement de données avec d’autres bases afin d’uniformiser l’information entre les cas rapportés.

Quels besoins techniques ?

Les besoins techniques du projet se profilent donc rapidement : il va falloir créer un pipeline permettant d’effectuer des actions de manière séquentielle permettant notamment : d’importer des données de différentes sources et de différents types, de les traiter (filter, modifier), puis de les enregistrer dans une base de données de manière à ce qu’elles soient exploitables par l’équipe scientifique.

On souhaite aussi que la base de données puisse être reconstruite à la demande et l’on doit pouvoir la mettre à jour régulièrement, chaque trimestre, au fur et à mesure que de nouveaux cas sont répertoriés et mis à disposition par la FDA.

Quelles difficultées spécifiques au BigData ?

Deux principales difficultés, caractéristiques du BigData, vont avoir un impact fort sur la conception du framework : le volume de données et la non structuration / la qualité de la donnée.

Le volume de données tout d’abord, implique une certaine rigueur algorithmique afin de trouver un équilibre entre la gestion des ressources de la machine et le temps d’execution du pipeline. Des solutions telles que le multi-threading et l’utilisation des stream notamment pour lire les fichiers volumineux vont être ici cruciales, ainsi que des méthodes telles que COPY en PostgreSQL, ou insertMany en MongoDB.

Ensuite, la diversité de format des données (csv, json, xml) et leur (mauvaise) qualité, implique d’avoir à disposition différents parser (en mode stream) et un système efficace de transformation et d’association.

Principales caractéristiques du framework

Philosophie

Le code est organisé en tâches (télécharger un fichier puis le dézipper, lire le fichier et extraire les données…), qui sont ensuite déclarées dans des séquences (init, update…). Chaque tâche est autoportantes, afin que la séquence soit reproductible dans le temps. Le choix a été fait de minimiser l’utilisation de l’espace disque (notamment lors des téléchargement, les fichiers sont traités tout de suite après), et de maximiser l’utilisation des ressources de la machine (utiliser le plus de coeurs et de mémoire possible) afin d’accélerer l’exécution du pipeline.

/*
 * Exemple de tâche
 */
pipelineManager.registerPipeline({
  id: 'clean_database',
  description: 'Initializing database',
  tasks: async () => {
    await db.dropDatabase();
  }
});

/*
 * Exemple de pipeline (séquence)
 */
return await pipelineManager.execute([
    'clean_database',
    'etl_open_fda_drug_events',
    'build_drug_ingredient_brand_name_mapping',
    'map_drug_names_to_dci',
    'map_indications_to_meddra',
    'prepare_adverse_events',
    'search_potential_covariables',
    'compute_drugs_interaction_effect',
]);

Des outils pour éviter de se répéter et standardiser

Différentes classes d’utilitaires permettent de faciliter les tâches courantes, comme lire un fichier csv, json ou xml, télécharger un fichier et le dézipper etc. Un utilitaire plus « interne » permet de formatter les logs de manière à fournir une information claire sur le déroulement de la séquence à l’utilisateur au fur et à mesure. Ces fonctions permettent donc de créer de nouvelles tâches rapidement en sachant que les actions sous-jacentes ont été optimisées.

J’ai notamment créé pour l’occasion un parser/mapper XML en mode stream, permettant de lire de manière séquentielle un fichier XML et de mapper les informations voulues vers un objet javascript.

/*
 * Exemple d'utilisation de la class XmlStreamMapper
 */
const xmlMapper = new XmlStreamMapper('/var/data/drug_bank_full_database.xml');
await xmlMapper.parseMap({
    generic: 'drugbank.drug.name', // Define XML path with dot notation
    synonyms: ['drugbank.drug.synonyms.synonym'],
    brands: ['drugbank.drug.international-brands.international-brand.name'],
    products: [{
        name: 'drugbank.drug.products.product.name',
        route: 'drugbank.drug.products.product.route'
    }]
}, async item => {
    // Do something here with each item, which has the interface defined in the mapper : {generic: string, synonyms: string[], brands: string[], products: {name: string, route: string}[]}
});

Concurrence vs Parallelisme

A l’origine, Node.js est un environnement évenementiel qui a été conçu pour gérer la charge de manière concurrente et asynchrone. La plupart du temps cela fonctionne très bien, mais lorsque l’on a besoin d’effectuer de nombreuses tâches bloquantes utilisant le CPU de manière intensive, il devient très intéressant de paralleliser ces tâches afin de gagner du temps, grâce aux worker threads.

Pour tirer parti de cette possibilité, j’ai créé une classe WorkerPool qui permet d’initialiser un nombre choisi de worker, qui vont venir executer les tâches enregistrées dans une queue de manière asynchrone. La communication entre les worker threads et le main thread est standardisée et permet notamment une initialisation asynchrone des workers. Cela est intéressant car les worker sont executés dans des contextes isolés, et l’on ne peut envoyer que des messages pouvant être deep copied, ce qui implique entre autre d’initialiser l’accès à la base de données dans chaque worker, ce qui est fait de manière asynchrone.

/*
 * Exemple d'utilisation d'un WorkerPool
 */
const workerPool = new WorkerPool(cpus().length, './build/src/workers/ETLAdverseEventsWorker.js');

await Promise.all(partitions.map(async (partition, i) => {
  const task: ETLAdverseEventsTask = {
    taskId: 'partition_' + i.toString(),
    url: partition.file,
    filename: 'drug_events_' + i
  };

  await workerPool.scheduleTask(task);
}));

await workerPool.releaseWorkers();

Conclusion et évolutions envisagées

Si aujourd’hui la plupart des outils d’ETL et d’analyse de données sont plutôt dans l’écosystème Python, nous avons montré qu’il était possible de faire au moins aussi bien avec Node.js. De plus, le framework est entièrement conteneurisé via Docker et permet donc d’être lancé sur virtuellement n’importe quelle machine de manière simple et reproductible. Cela constitue d’ailleurs la prochaine étape pour notre client, qui souhaite mettre en place ce projet dans une infrastructure cloud tel que AWS EC2, ce qui lui permettra de lancer les conteneurs à la demande pour effectuer les mises à jour de la base de données chaque trimestre.

Rémi Tod @ KZS LAB

(1) Data-Driven Prediction of Drug Effects and Interactions – Nicholas P. Tatonetti, Patrick P. Ye, Roxana Daneshjou, and Russ B. Altman

(2) Data-Driven Prediction of Drug Effects and Interactions, Supplementary Materials – Nicholas P. Tatonetti, Patrick P. Ye, Roxana Daneshjou, and Russ B. Altman