Πώς να χρησιμοποιήσετε το Redis ως Job Scheduler

Πώς να χρησιμοποιήσετε το Redis ως job scheduler

Κατά τη διάρκεια της κατασκευής του needit.gr εμφανίστηκε γρήγορα το πρόβλημα του προγραμματισμού εργασιών. Η αποστολή ειδοποιήσεων μέσω email, η ακύρωση παραγγελιών μετά από ορισμένο χρονικό διάστημα και πολλές άλλες εργασίες που πρέπει να εκτελεστούν σε ακριβείς ημερομηνίες μετά τη σύλληψή τους, είναι μερικά από τα λίγα παραδείγματα εργασιών που προκύπτουν κατά τη δημιουργία μιας εφαρμογής μεγάλης κλίμακας. Για την αντιμετώπιση τέτοιων προβλημάτων προγραμματισμού, οι προγραμματιστές δημιούργησαν βιβλιοθήκες προγραμματισμού εργασιών όπως Bull, Agenda, Bee-queue και πολλές άλλες. Αλλά σήμερα θα ήθελα πολύ να ρίξω μια ματιά στον δικό μας προγραμματιστή εργασίας Node.js που ονομάζεται Hermes και εν τω μεταξύ να κάνω μια βαθιά κατάδυση σχετικά με τον τρόπο λειτουργίας του Redis και τις τροποποιήσεις και τα hacks που μπορούμε να χρησιμοποιήσουμε για να ξεκλειδώσουμε τις δυνατότητες προγραμματισμού εργασιών του.

Τι είναι το Redis;

Redis

Το Redis είναι ένας χώρος αποθήκευσης δομών δεδομένων στη μνήμη, που χρησιμοποιείται ως κατανεμημένη βάση δεδομένων κλειδιού-τιμής στη μνήμη. Το έργο αναπτύχθηκε και συντηρήθηκε από τον Salvatore Sanfilippo, ξεκινώντας το 2009. Το Redis χρησιμοποιείται από εταιρείες όπως το Twitter, το Github, το Snapchat, το Craigslist, το StackOverflow και είναι πολύ πιθανό να επιλέξατε το Redis ως προσωρινή μνήμη για το τελευταίο μεγάλο έργο σας λόγω της ταχύτητας , αξιοπιστία και ευκολία χρήσης που παρέχει.

Γι' αυτό επιλέξαμε και εμείς το Redis για να δημιουργήσουμε έναν προγραμματιστή εργασιών.

  • Χρησιμοποιούσαμε ήδη το Redis, επομένως δεν είχαμε επιπλέον έξοδα αναπτύσσοντας κάτι εντελώς νέο.
  • Το Redis είναι εξαιρετικά γρήγορο, είναι μια βάση δεδομένων στη μνήμη και η πρόσβαση στη μνήμη μπορεί να είναι χιλιάδες φορές ταχύτερη από την πρόσβαση στο δίσκο.
  • Το Redis είναι αξιόπιστο, χρησιμοποιείται από εταιρείες και προϊόντα που χρησιμοποιούνται καθημερινά από εκατομμύρια.
  • Είναι επεκτάσιμο, υποστηρίζοντας χαρακτηριστικά όπως το replication.
  • Το Redis είναι εύκολο στη χρήση. Είναι βασικά ένας καλύτερος πίνακας κατακερματισμού.

Καθορισμός έναν job scheduler

Ο προγραμματιστής εργασιών είναι μια εφαρμογή υπολογιστή για τον έλεγχο της εκτέλεσης εργασιών χωρίς παρακολούθηση του προγράμματος παρασκηνίου.

Ας ορίσουμε τις δυνατότητες που χρειαζόμαστε από έναν job scheduler.

  • Πρέπει να είμαστε σε θέση να δημιουργήσουμε εργασίες, να ορίσουμε μια ημερομηνία ή ένα χρονόμετρο για εκτέλεση και να περιμένουμε από τον προγραμματιστή μας να το κάνει.
  • Οι εργασίες θα πρέπει να παραμένουν ακόμα και αν ο διακομιστής μας επανεκκινήσει.
  • Θα πρέπει να μπορούμε να σταματήσουμε μια προγραμματισμένη εργασία.
  • Εφόσον χρησιμοποιούμε το Redis, χρειαζόμαστε έναν τρόπο σύνδεσης, αποσύνδεσης και αλληλεπίδρασης με τη βάση δεδομένων Redis.

Προαπαιτούμενα

Θα μπορούσατε να χρησιμοποιήσετε οποιαδήποτε γλώσσα, framework ή σύστημα θέλετε, καθώς η θεωρία παραμένει η ίδια. Για τη συγκεκριμένη υλοποίηση, χρησιμοποιούμε Node.js και TypeScript στα Linux. Για να ακολουθήσετε αυτό το σεμινάριο θα χρειαστείτε Node, yarn και Redis ≤ 6.0. Η εγκατάσταση αυτών είναι πέρα από το πεδίο αυτού του σεμιναρίου.

Σύνδεση με το Redis

Το πρώτο βήμα είναι η δημιουργία μιας σύνδεσης Redis. Για αυτό, χρησιμοποιούμε ένα πακέτο που ονομάζεται ioredis, το οποίο μπορεί να εγκατασταθεί χρησιμοποιώντας την ακόλουθη εντολή:

yarn add ioredis yarn add -D @types/node # for typescript

Υποθέτοντας ότι έχετε το Redis είτε σε κοντέινερ είτε σε διακομιστή, μπορούμε να δημιουργήσουμε μια κλάση και να δημιουργήσουμε μια σύνδεση στον constructor. Μπορούμε επίσης να ορίσουμε μια λειτουργία «κλείσιμο» για το κλείσιμο της σύνδεσης.

import Redis from "ioredis"; interface HermesConstructor { host?: string; port?: number; db?: number; } export class Hermes { private redis: Redis; constructor({ host = "127.0.0.1", port = 6379, db = 0 }: HermesConstructor) { const initRedis = (): Redis => new Redis({ host, port, db, }); this.redis = initRedis(); } close() { this.redis.disconnect(); } }

Αυτό που θα χρησιμοποιήσουμε για τον προγραμματιστή μας είναι μια δυνατότητα Redis που ονομάζεται EXPIRE, η οποία θα λήξει ένα δεδομένο κλειδί μετά από ένα ορισμένο χρονικό διάστημα. Η λήξη του Redis λειτουργεί ταξινομώντας τα κλειδιά κατά χρόνο λήξης σε ένα δέντρο radix, αλλά αυτό είναι ένα χαρακτηριστικό διαθέσιμο στην έκδοση Redis 6. Πριν από αυτό, ένας προγραμματιστής εργασιών δεν θα ήταν τόσο αξιόπιστος, καθώς η παλιά μέθοδος ήταν πιο lazy, κυρίως ελέγχοντας εάν ένα κλειδί έχει λήξει αφού προσπαθήσετε να αποκτήσετε πρόσβαση σε αυτό ή ελέγχοντας τυχαία ορισμένα κλειδιά.

Για να ακούσουμε για μια λήξη κλειδιού, χρειαζόμαστε μια άλλη σύνδεση Redis για να λειτουργήσει ως ακροατής. Ο ακροατής που χρησιμοποιεί τη μέθοδο "psubscribe" μπορεί να ακούσει σε ένα δεδομένο κανάλι όλα τα κλειδιά που έχουν λήξει. Δεδομένου ότι μπορούμε να χρησιμοποιήσουμε το Redis για άλλα πράγματα εκτός από έναν προγραμματιστή, πρέπει να ορίσουμε μια "υπογραφή" για τα κλειδιά μας ώστε να μπορούμε να τα διαφοροποιούμε από τα εξωτερικά στοιχεία. Σε αυτό το παράδειγμα, θα χρησιμοποιήσουμε τη συμβολοσειρά "hermes" ως την υπογραφή μας και θα φιλτράρουμε κάθε κλειδί που δεν ξεκινά με αυτήν την υπογραφή.

import Redis from "ioredis"; interface HermesConstructor { host?: string; port?: number; db?: number; } export class Hermes { private redis: Redis; /* set up a signature */ private signature = "hermes"; /* our listener */ private listener: Redis; constructor({ host = "127.0.0.1", port = 6379, db = 0 }: HermesConstructor) { const initRedis = (): Redis => new Redis({ host, port, db, }); this.redis = initRedis(); /* after a connection is established enable keyspace event notifications for expired keys this way the listener will be able to listen for them */ this.redis.on("ready", () => this.redis.config("SET", "notify-keyspace-events", "Ex") ); /* establish a second connection */ this.listener = initRedis(); /* subscribe to the channel and listen for expirations */ this.listener.psubscribe("__keyevent@" + db + "__:expired"); this.listener.on("pmessage", (_, __, message) => { /* if the expired key starts with the signature we set */ if (message.startsWith(`${this.signature}:`)) { /* CODE */ } }); } close() { this.redis.disconnect(); /* close connection */ this.listener.disconnect(); } }

Χειριστές

Οι χειριστές είναι οι λειτουργίες που θα χειριστούν τις εργασίες όταν λήξει μια εργασία. Αυτό μπορεί να είναι η αποστολή ενός email ή μια εμφάνιση στην κονσόλα. Πρώτα, θα χρειαστούμε μια δομή για να διατηρήσουμε τους χειριστές μας, στο Node μπορούμε απλώς να κρατήσουμε object. Στη συνέχεια, χρειαζόμαστε έναν τρόπο να δημιουργήσουμε χειριστές.

import Redis from "ioredis"; interface HermesConstructor { host?: string; port?: number; db?: number; } export class Hermes { private redis: Redis; private signature = "hermes"; private listener: Redis; /* an object to keep out handlers */ private handlers: Record<string, (args: unknown) => unknown>; constructor({ host = "127.0.0.1", port = 6379, db = 0 }: HermesConstructor) { const initRedis = (): Redis => new Redis({ host, port, db, }); this.redis = initRedis(); this.redis.on("ready", () => this.redis.config("SET", "notify-keyspace-events", "Ex") ); this.listener = initRedis(); this.listener.psubscribe("__keyevent@" + db + "__:expired"); this.listener.on("pmessage", (_, __, message) => { if (message.startsWith(`${this.signature}:`)) { /* CODE */ } }); } close() { this.redis.disconnect(); this.listener.disconnect(); } /* add a new handler or update an existing one */ addHandler({ name, func }: { name: string; func: (args: any) => void }) { if (name.includes("__")) { throw new Error(`ERROR: Handler cannot contain "__".`); } if (this.handlers === undefined) { this.handlers = {}; } this.handlers[name] = func; } }

Προγραμματίστε εργασίες και διατηρήστε δεδομένα μετά τη λήξη ενός κλειδιού

Το επόμενο βήμα φαίνεται απλό. Δημιουργήστε ένα κλειδί με καθορισμένο χρόνο λήξης. Χρησιμοποιήστε την υπογραφή στην αρχή του κλειδιού, προσθέστε τον χειριστή ως τιμή και όταν λήξει το κλειδί, λάβετε την τιμή (δηλαδή τον χειριστή) και τελειώστε.

Δυστυχώς, δεν είναι τόσο απλό, καθώς όταν ένα κλειδί λήγει, έχει λήξει. Δεν μπορείτε να αποκτήσετε πρόσβαση στην τιμή του.

Μια απλή λύση είναι η χρήση ενός κλειδιού shadow. Όταν δημιουργούμε το πρώτο ζεύγος κλειδιού-τιμής, δημιουργήστε ένα shadow αντίγραφο του ζεύγους μας με λήξη μεγαλύτερη από την πραγματική. Όταν λήξει το αρχικό κλειδί, λάβετε την τιμή από το ζεύγος σκιών που δεν έχει ακόμη λήξει. Λοιπόν το πρόβλημα διορθώθηκε!

Στη συνέχεια, πρέπει να αποφασίσουμε ποιες πληροφορίες χρειάζεται η εργασία μας. Πρώτα χρειαζόμαστε το όνομα του χειριστή, μια λίστα με ορίσματα που χρειάζεται ο χειριστής, ένα αναγνωριστικό για να το διαφοροποιήσουμε από άλλα σε περίπτωση που χρειαστεί να το ακυρώσουμε και, τέλος, μια λήξη.

Η μορφή ζεύγους κλειδιών που θα χρησιμοποιήσουμε είναι:

  • Key: signature:handler__id
  • Value: αδειάζω
  • Shadow key: shadow:signature:handler__id
  • Shadow value: ορίσματα handler
import Redis from "ioredis"; interface HermesConstructor { host?: string; port?: number; db?: number; } export class Hermes { private redis: Redis; private signature = "hermes"; private listener: Redis; /* an object to keep out handlers */ private handlers: Record<string, (args: unknown) => unknown>; constructor({ host = "127.0.0.1", port = 6379, db = 0 }: HermesConstructor) { const initRedis = (): Redis => new Redis({ host, port, db, }); this.redis = initRedis(); this.redis.on("ready", () => this.redis.config("SET", "notify-keyspace-events", "Ex") ); this.listener = initRedis(); this.listener.psubscribe("__keyevent@" + db + "__:expired"); this.listener.on("pmessage", (_, __, message) => { if (message.startsWith(`${this.signature}:`)) { /* CODE */ } }); } close() { this.redis.disconnect(); this.listener.disconnect(); } /* add a new handler or update an existing one */ addHandler({ name, func }: { name: string; func: (args: any) => void }) { if (name.includes("__")) { throw new Error(`ERROR: Handler cannot contain "__".`); } if (this.handlers === undefined) { this.handlers = {}; } this.handlers[name] = func; } async schedule({ handler, args, id, expiration, }: { handler: string; args?: Record<string, string | number>; id: string | number; expiration: number; }) { /* validate */ if (handler.includes("__")) { throw new Error(`ERROR: Handler cannot contain "__".`); } if (this.handlers?.[handler] === undefined) { throw new Error(`ERROR: Handler "${handler}" does not exist.`); } /************/ this.redis.set( `${this.signature}:${handler}__${id}`, "", "EX", Math.round(expiration) ); this.redis.set( `shadow:${this.signature}:${handler}__${id}`, JSON.stringify(args), "EX", Math.round(expiration) + 60 ); } }

Εκτέλεση εργασιών

Τέλος, μπορούμε να προσθέσουμε μια μέθοδο για την εκτέλεση του κατάλληλου χειριστή και να καλέσουμε αυτήν τη μέθοδο όταν ο ακροατής εντοπίσει ότι ένα κλειδί έληξε.

Μπορούμε επίσης να προσθέσουμε μια μέθοδο για την ακύρωση μιας εργασίας, δεδομένου του ονόματος χειριστή και του αναγνωριστικού της. Επίσης, μπορούμε να προσθέσουμε μια μέθοδο για τον προγραμματισμό μιας εργασίας σε μια συγκεκριμένη ημερομηνία.

Ο τελικός κώδικας, συμπεριλαμβανομένου ενός δείγματος εκτέλεσης, θα πρέπει να μοιάζει κάπως έτσι:

import Redis from "ioredis"; interface HermesConstructor { host?: string; port?: number; db?: number; } export class Hermes { private redis: Redis; private signature = "hermes"; private listener: Redis; /* an object to keep out handlers */ private handlers: Record<string, (args: unknown) => unknown>; constructor({ host = "127.0.0.1", port = 6379, db = 0 }: HermesConstructor) { const initRedis = (): Redis => new Redis({ host, port, db, }); this.redis = initRedis(); this.redis.on("ready", () => this.redis.config("SET", "notify-keyspace-events", "Ex") ); this.listener = initRedis(); this.listener.psubscribe("__keyevent@" + db + "__:expired"); this.listener.on("pmessage", (_, __, message) => { if (message.startsWith(`${this.signature}:`)) { /* execute the appropriate handler */ this.execute(message); } }); } close() { this.redis.disconnect(); this.listener.disconnect(); } /* add a new handler or update an existing one */ addHandler({ name, func }: { name: string; func: (args: any) => void }) { if (name.includes("__")) { throw new Error(`ERROR: Handler cannot contain "__".`); } if (this.handlers === undefined) { this.handlers = {}; } this.handlers[name] = func; } async schedule({ handler, args, id, expiration, }: { handler: string; args?: Record<string, string | number>; id: string | number; expiration: number; }) { /* validate */ if (handler.includes("__")) { throw new Error(`ERROR: Handler cannot contain "__".`); } if (this.handlers?.[handler] === undefined) { throw new Error(`ERROR: Handler "${handler}" does not exist.`); } /************/ this.redis.set( `${this.signature}:${handler}__${id}`, "", "EX", Math.round(expiration) ); this.redis.set( `shadow:${this.signature}:${handler}__${id}`, JSON.stringify(args), "EX", Math.round(expiration) + 60 ); } scheduleAt({ handler, args, id, expiration, }: { handler: string; args?: Record<string, string | number>; id: string | number; expiration: Date; }) { const diffInSeconds = (expiration.getTime() - new Date().getTime()) / 1000; if (diffInSeconds <= 0) { throw new Error("Error: Date must be in the future"); } this.schedule({ handler, args, id, expiration: diffInSeconds }); } /* detect handler and execute */ private async execute(key: string) { const handler = new RegExp(`${this.signature}:(.*)__`, "g").exec(key); const args = JSON.parse(await this.redis.get(`shadow:${key}`)); if (this.handlers[handler[1]] === undefined) { throw new Error(`ERROR: Handler "${handler[1]}" does not exist.`); } this.handlers[handler[1]](args); this.redis.del(`shadow:${key}`); } cancel({ handler, id }: { handler: string; id: string }) { const key: string = `${this.signature}:${handler}__${id}`; this.redis.del(key); this.redis.del(`shadow:${key}`); } } /************** execution *************/ const hermes = new Hermes({ host: "127.0.0.1", port: 6379, }); const main = async () => { // Create a handler hermes.addHandler({ name: "console_log", func: ({ message }) => console.log(message), }); // Create job hermes.schedule({ // Choose a handler handler: "console_log", // Add a unique Id id: 13, // Expire in 45 seconds expiration: 45, // Add arguments for handler function args: { message: "hello world" }, }); }; main();

Μπορείτε να βρείτε τον πλήρη πηγαίο κώδικα και το hermes ως πακέτο npm στους παρακάτω συνδέσμους:

Ευχαριστώ για την ανάγνωση! Ελπίζω να μάθατε κάτι νέο και να μη διστάσετε να χρησιμοποιήσετε το Hermes στο επόμενο έργο σας ή να συνεισφέρετε μέσω του Github.