These two days are dealing with files, very interesting, every step will not be satisfactory, but each step of the solution has benefits, first of all, the monitoring of file changes, can find a lot of ways, such as through the ELK family of Filebeat tools to detect, but external tools are not easy to integrate into storm, it is best to write your own Java program to monitor.

Introduced Java NIO monitoring files

In fact, jdk7 above the version has a better choice, that is, the WatchService monitor in the nio package, I think it has two advantages, one is the signal notification mechanism by the operating system, when the file directory changes, it sends a signal to the application layer monitor, then the efficiency of this active notification by the operating system is far better than the application’s repeated rotation of the file, and does not occupy too many system resources; the second programming model does not use the observer mode to register the listener program Instead, the multi-threading problem is hidden, and the client adopts an intuitive call to the API that blocks the loop, which is very conducive to embedding into various running containers to perform file acquisition monitoring.

In addition, after monitoring file changes, I use RadmonAccessFile object, this file operation object is often used for intermittent point continuous transmission of such requirements, very convenient, the key is to design a durable displacement record file, to ensure that the collector can always start to collect data from the latest change data point position after restarting. As shown in the following figure:

After the file monitoring and collection function is embedded in the Storm cluster, a new problem arises, that is, the Storm spout instance will not run on the specified machine as you wish, but will be completely specified by the Storm cluster on the node, but the monitored file location is fixed, anyway, there is always a stupid way: when the Storm cluster starts, determine the machine node running the spout, and then the machine executes the cdc file output program, but this coupling is too strong, you must follow Storm’s arrangement of spout instances changes the collection location, and maintenance management can be cumbersome and error-prone.

introduced distributed file system

So I came up with a new hypothesis: solve this problem through the distributed file system (dfs), but the selection of dfs is very important, Hadoop hdfs is certainly not OK, it is out of the ordinary file system operation mode, and finally I picked two dfs, one is ClusterFS, the other is MooseFS, they both have fuse combination function, through mount dfs to the local directory, let access dfs As seamless as accessing a local directory file, any client node of dfs will be notified of changes to the file on all dfs client nodes, so I have all storm nodes become clients of dfs, so that no matter whether spout runs randomly on any node, you can access the monitored files in dfs in the same directory of this node, and the monitored files also have high reliability of multiple copies.

This solution to the distributed computing process combined with distributed storage, that is, the Storm compute node because it is the dynamic allocation location of the cluster, can not fix the Storm spout file collection location, so I chose the idea of distributed file system, mainly using GlusterFS to connect Linux fuse (user space file system) method, so that each spout node is a dfs client, then no matter which node spout is allocated to , you can collect data from the PostgreSQL cdc output file copy by monitoring and reading the GlusterFS client mount directory of the node.

but a big bug was found in the test, which made me feel shocked, and the cause of the bug was analyzed:

The art of subduing Bug

The Java File Monitor (WatchService) built into spout monitors directory changes driven by signals transmitted by the operating system, so that spout can wait for file changes to be monitored, but I take it for granted that even if the PostgreSQL cdc output node and the spout file acquisition monitoring node are not a machine, as long as the copy is synchronized through the distributed file system, spout The node must be able to sense the change of the current directory copy, in fact, I was wrong, the watchservice in spout can not perceive the change of the directory copy at all, so if you want to get the file change signal notification of the operating system, you must read and write the file directory on a machine, there will be a file change signal sent to the upper application, my previous test was correct only because the PostgreSQL output and spout monitoring are the same server.

Then the problem comes, my hypothesis is that spout does not have to consider the directory location of the collection point, otherwise the reverse according to the storm cluster to allocate the spout node address can be pg monitoring, obviously this is the reverse process, and tried MooseFS and NFS, the result is the same, NFS is not as efficient as the distributed file system.

When there was no way out, when I thought my hypothesis was about to fail, a new idea opened my mind, why did I have to set spout only 1 degree of parallelism? According to the number of jobs participating in the Storm cluster topology is 3, then set the spout parallelism to 3, so that each machine will have a spout monitoring local GlusterFS mount directory, then no matter which node my PostgreSQL cdc output program is started on, and only one spout senses the copy change to start pushing data, the others are wait, which solves the problem and also ensures that even if it is another node PostgreSQL cdc file output, the previous spout instance naturally wait, the new spout works, still perfectly guarantee the postgreSQL cdc program and spout reliability redundancy.