There are probably two features that you may find interesting for a SWISH cluster:
Redis streams to pick up unhandled tasks when a new server/machine joins the cluster (this video from the creator of redis may give you good ideas). NOTE: This is better than Pub/Sub because it doesn’t require the subscriber to be on-line: the client can get all the unprocessed messages when it comes on-line. See XGROUP/XREADGROUP/XACK commands and the section on Consumer Groups.
Client tracking; with this feature the redis server will notify the client when some other client changes a key (with an invalidate message). See the CLIENT TRACKING/SUBSCRIBE __redis__:invalidate commands and the Client side caching intro.
Thanks for the hints. I’m learning quickly There are a lot of useful features in redis, it seems popular and I like the fairly simple setup (really simple for a single node) and protocol. As with Prolog, there are quite a few ways to achieve some goal and this requires some learning.
As of the library, I have added several high-level primitives to deal with lists and hashes and provided lazy-list support for the SCAN family of commands. I also did most of the work on doing the low level message (de)serialization in C. Unfortunately it is over 10 times more code and probably more error-prone. I do expect it to be a couple of times faster though, which may be enough to make most redis interaction (network) I/O bound rather than CPU bound.
One thing I’m still looking for is whether it is possible to setup a cluster of only a few nodes connected using high-latency WAN networking.
You can also try (for development mostly, but may even be useful in production) to run several servers on one machine
Did you do this for version 3 of the protocol (redis 6)? It is much better since it allows for out-of-band information and also supports the different data types, instead of returning just a flat arrays.
This is great!
I really like this too, this is how software is supposed to be!
I know that What I’m looking for is to connect a few SWISH instances running in different parts of the planet and acting as a single instance without a single point of failure. That is hard. Seems the closest one can get is master server on a pretty reliable place and slaves on the machine where the swish instances run. Next improvement would be to create a cluster for the master, which requires three independent nodes if you want to be able shut down/reboot a single node without interrupting service.
No. Latest Ubuntu still ships with version 5 Thanks for the pointer. Seems this isn’t really hard and will indeed improve a couple of things.
Thanks for noting. Ubuntu 20.04 still ships (only) version 5. Now running v6 in a docker image. I’ve pushed updates that (I think) support the entire version 3 protocol. Yes, notably the out-of-bound data and the fact the interface tells you an array is a key-value sequence helps. Key-value sequences are now passed as Key-Value Prolog pairs and out-of-bound push messages are handed to broadcast/1. Surely more things will change …
It is a pity that both keys and values are ultimately simple byte sequences. Would be nice if they could carry some type information
Something to add to the API is support for streams, since they replace pub/sub for many use cases, as they allow the consumer to pick up messages that were delivered when off-line. Streams were available since redis 5. Especially Consumer groups is very useful.
EDIT: During connection to the server do you automatically upgrade to version 3 if the server supports it?
Yes. I’m looking into these. I’m not really sure how we should support more high level workflows. There are probably many ways to do so using Redis and thus, if you hookup a SWI-Prolog service in a network you’ll have to comply with whatever was decided and a high level library will probably not help. On the other hand, high level libraries probably help a lot to build a cluster from SWI-Prolog services.
At the moment, no. You use redis_server/3 with the version(V) option, which together with the optional User and Password translate into a HELLO command.
Thanks. Still requires an extra round trip and parsing. That is fine for connections you keep open, but not so good for one-shot connect/query/disconnect operations. Well, maybe we can cache that based on the address …
Not sure I understand that. The generic interface lets you call any Redis command. Reading through the Redis streams intro, it seems there are three main scenarios for using streams:
Maintain and query a log of events, i.e., a timeline.
Provide an alternative to Redis’ publish/subscribe API that ensures
messages get delivered by all clients even if they are offline at
the moment an event is published.
Distribute messages over a group of clients. This mode assigns
messages to clients in a round-robin fashion. Clients confirm
a specific message is handled. Living clients can inspect the
stream for possibly dead clients and migrate the pending messages
to other clients.
The first is too simple to worry about. The others are more complicated. Notably in the latter scenario we have to worry about cleaning the stream, adding and removing consumers, etc. As I understand it, consumers have to test regularly whether the stream contains unprocessed messages that are queued for a consumer that does not respond and claim these messages for life consumers (and possibly permanently remove the consumer when non-responding for a too long interval).
Redis seems to provide all low support for an advanced brokering system. It seems the high level control needs to be added externally though. I’m thinking about a library that makes it easy to maintain a set of consumers.
You don’t have to do an extra round-trip to the server, as you can use the EVAL command to run a short and simple lua script using redis.setresp(3) which will set the protocol. Within the script you can check if the protocol upgrade worked and then continue, all this will happen on the server. Check the EVAL command and the RESP3 section within it for more info.
Speaking about performance, does library(redis) support a unix socket connection? For a local server it gives about a 200% performance benefit (e.g. we have 153K GETs/sec with a unix socket vs 75K GETs/sec with TCP)!
Not yet. The socket library provides unix_domain_socket/1, so that should be trivial to add. First struggling with consumer groups
I thought performance was equal these days (at least, on Linux). I typically like them because you do not have to reserve a port (which may conflict) and you can use file permissions to deal with your security. Which OS is this?
@swi, as you seem to know a lot about redis, I’d like to share some stuff I’m struggling with.
I now have an (internal) library(redis_streams) that should provide high level primitives to realise common workflows related to streams. The simplest one simply subscribes to a stream and uses broadcast to distributed messages it gets from XREAD. This has a bit of error recovery in the sense that it will try to reconnect if the connection to the server gets lost. That is fairly trivial, but indeed something you surely do not want to repeat in each of your applications
The next one encapsulates XREADGROUP, calling broadcast_request/1 to dispatch the message and sending XACK back on success. Here we need quite a bit of failure recovery.
A node may fail. To deal with that you can add a timeout and (Redis BLOCK) and after each timeout a node will run PENDING and try to claim pending requests on other nodes. That part is mostly working (needs stress testing). There are a couple of things I worry about:
Messages that get delivered too often can probably not be processed. What should we do? I guess we want to use XDEL on them, but possibly we need to inform something this happened. Provide some call-back for this situation?
What happens if a node fails permanently? Does redis keep sending new messages to it, so all these messages need to wait for the timeout to be claimed by some other node? If I decide it is dead for more than X time, we should have the option to delete it from the network, I guess. How? There is an XGROUP command for that, but that discards possible pending messages on that node if I read the docs correctly. Of course I can XCLAIM these first, but there seems and atomicy issue here, no?
Should we limit the number of messages we want to claim in a single idle processing slot? How? I can see at least three options:
do not claim more than N messages per idle slot
do not spent more than T time on processing such messages
After each message, poll using XREADGROUP whether we have messages targeted at this consumer.
Finally, I think we need to provide some synchronous way to wait for a reply while using streams to dispatch jobs to a cluster. For example, I have a web server. It gets some request from a client, does an XADD to the Redis server and some node will pick this up and do the job. Now, the web server needs to wait for the reply and use it to formulate a reply document for its client. How do we do that? Should the web server make a list and add the key to that to the message such that the processing node can put the result there and the web server can use e.g. PLPOP to wait for the result?
“So once the deliveries counter reaches a given large number that you chose, it is probably wiser to put such messages in another stream and send a notification to the system administrator. This is basically the way that Redis streams implement the concept of the dead letter.”
So I think we would: 1) put the dead letter in a different stream (the dead letters stream) – XACK’ing the message in the original stream, and 2) call the user specified call back, and he can do what he wishes. A sensible action in the call back would be to XACK the dead letter either (in the dead letter stream) by storing it somewhere for forensic analysis or simply XACK’ing it and incrementing a counter to show how many dead letters have been discarded. You can also provide an option to XACK the message (from the dead letter stream) automatically while incrementing a counter, to provide some default behavior for simple consumers. I don’t think XDEL is a good idea as it is not meant for this purpose.
Another thing to consider is to give the user the option to have a stream with a set limit of messages, check the Capped Streams section in intro to streams and also the “Recovering from permanent failures” section.
You can mean two things by “a node fails permanently” (I am assuming by “node” you mean a consumer):
The consumer is really down, and XREADGROUP is not being called, in this case the new messages will be delivered to other consumers. The old messages (that had been delivered to the now dead consumer) will be in the consumer group’s pending list and will eventually be picked up by some other consumer (through the mechanism you are implementing using XPENDING).
The consumer is failing to process the message continuously, but calling XREADGROUP, in this case we have the dead letter situation, and the message stays in the pending list for the consumer group. We can deal with it in the way specified above by putting the dead letter in a separate stream and executing the user call-back.
Note two things here: 1) Redis does not deliver messages to a consumer, but the consumer actively asks for a message using XREADGROUP (and then the message is put in the pending list for the consumer group until XACK’ed by the consumer to whom it was delivered). 2) the pending list of messages belongs to the consumer group, and not specifically to an individual consumer. Thus, when you delete a consumer from the group the messages remain in the consumer group’s pending list, and they need to be XCLAIM’ed by another consumer, thus there is no atomicity issue (if I understand the problem you stated correctly). The key here is that the pending list is not for a consumer, but for a consumer group.
I think one way to do this (there are others) would be 1) assign an ID to each web server request (this is important since replies may come out of order), 2) as workers (consumers) finish processing the requests they can PUBLISH the reply (the key includes the ID), the web server can SUBSCRIBE and process replies as they come (perhaps out of order). Pub/Sub is OK because we presume we want to forget any replies that come when the web server is down.
That’s what I can think of so far, but my head is boiling a little already
P.S. I think, in general, we should keep in mind the erlang principles for distributed systems: fail early, and use messaging (instead of shared data, but redis already does this for us).
Thanks. That happens right now, but I’ll change the default. Not really sure how, but your description gives some hints.
Thanks. I thought I only needed to check the delivery count when claiming a message, but I should do so anyway it seems.
I’m not entirely convinced about that. XPENDING specifically allows getting the pending messages for a specific consumer. Need to read more carefully and experiment a little to fully understand when a message is assigned to a consumer (I think at the moment a XGROUPREAD happens).
Yes and no, it seems. Redis is first of all a shared store. Then there is pub/sub which is send-and-forget and the stream stuff. The stream stuff greatly simplifies building a cooperating cluster, but it is stateful.
Thanks for the feedback. I’ll need to make a few changes as a result. I’ve pushed a new version including a library(redis_streams) to implement most of this. There is also a demo included that allows for multiple compute nodes and multiple client nodes. A bit silly, but I learned a lot from it
There are also a few changes:
Allow for unix(File) for Unix domain sockets (not yet tested)
Redefined redis/1 and redis/2. redis/1 now does what redis_cli/1 was doing (removed) and redis/2 takes a server and request as arguments, ignoring the reply (but raising an exception if this is an error and fail if the reply is nil).
More comments are welcome (as well as tests, demos, etc.)