开发者

Writing into file under NFS in Grails app using Quartz and RabbitMQ

开发者 https://www.devze.com 2023-02-14 19:21 出处:网络
We have a Grails app, that uses RabbitMQ for passing messages. The architecture we have for the production environent is:

We have a Grails app, that uses RabbitMQ for passing messages. The architecture we have for the production environent is:

- 2 web servers, let's say web1, web2. Both of them run an instance of a grails app

- a RabbitMQ server is installed on only one of the servers, web1

- In the RabbitMQ configuration in the Grails app, we have 10 consumers (per application instance)

We have a Quartz job that creates some messages and passes them to the queue, and we have a consumer service that handles the messages getting in the queue.

1. How can we define which server (application instance)should execute the Quartz job? I assume there's only one server running the Job and Quartz is taking care of that.

2. How can we define which server handles the messages from the queue?

The problem is, that the methods that consume开发者_JAVA百科 the queue messages, eventually write some rows in a .csv file, which is under NFS.

Initially we had some issues, with the actual writing in the .csv file. There were some "broken", half-written rows in the file, but we resolved this by adding @Synchronized on the method that wrote in the .csv file. Now the issue is, that some rows, just don't get written at all.

Any ideas? I am not sure whether this is a programming issue, and if so how could it be resolved, or if it is an architectural issue.

UPD:@Olexandr

Initially I had (example)

def getStringsToWrite(File file, List someOtherList) {
  def stringsList = []
  someOtherList.each {
    def someString = "someString"
    stringsList << someString
  }
  writeRowsToFile(file, stringsList)
}

@Synchronized def writeRowsToFile(File file, List stringsList) { file.withWriterAppend {out-> stringsList.each {row-> out.writeLine row } } }

And it did was not working "properly"

Now, I changed the code to something like:

class someServiceClass { //singleton
  LinkedBlockingQueue csvWritingQueue = new LinkedBlockingQueue()  

def getStringsToWrite(File file, List someOtherList) { def stringsList = [] someOtherList.each { def someString = "someString" csvWritingQueue.put(someString) } writeRowsToFile(file) }

@Synchronized def writeRowsToFile(File file) { file.withWriterAppend {out-> while (!csvWritingQueue.isEmpty()) { out.writeLine csvWritingQueue.poll() } } } }

Please let me know if you need any additional info. I tried to keep it at a high level perspective, but I will be more than happy to include some code in the post.

Thanks in advance,

Iraklis


i'm guessing that it could happen because some threads(handlers) are suspending due to timeout while waiting on synchronized lock.

But this is only my gues and it depends on application architecture(how did you organize queues, handlers, etc.)

UPD: I have some sort of idea. Writing to CSV file is not thread safe and that's why you are using synchronization lock. And maybe that's why some threads are suspended while waiting. But we could solve it easily by creating WriteManager class. This class should have BlockingQueue instance so handlers will not invoke write method directly but they will put data to be written in the Queue(if queue is full they have to wait but if queue will be big enought they have not wait at all) and manager class will poll queue in infinite loop(of course it will be finite and controled by a flag) and write data to CSV file. So when you have to start writing\managing process you just starting WriteManager and when no more writing is needed you just switch off the polling loop(actually loop shouldn't stop when you stop it manually but it should to poll all objects from queue and only than stop).

UPD2: Does you invoke getStringsToWrite multiply times ? This is very basic example of what i mean but you could easily extended it or even fully rewrite.

class someServiceClass { //singleton
  LinkedBlockingQueue csvWritingQueue = new LinkedBlockingQueue()
  def currentWriter = null
  def writerThread = null

def getStringsToWrite(List someOtherList) {
    def stringsList = []
    someOtherList.each {
      def someString = "someString"
      csvWritingQueue.put(someString)
    }
  }


@Synchronized
  def writeRowsToFile(File file) {
    file.withWriterAppend {out->
      while (!csvWritingQueue.isEmpty()) {
        out.writeLine csvWritingQueue.poll() 
      }
    }
  }

 def write (File file) {
   if (writerThread == null) {
   currentWriter = new Runnable() {
      boolean isRun = true
      public void run() {
         while(isRun) {
            if(!csvWritingQueue.isEmpty()) {
               writeRowsToFile(file)
            }
           try {
            Thread.sleep(5 * 1000);
           }catch(e){
            //e.printStackTrace()
           }
         }
      }
   }
   writerThread = new Thread(currentWriter)
  writerThread.start()
  }
 }

def stop() {
  if (currentWriter != null) {
    currentWriter.isRun = false;
   try {
    writerThread.join();
   }catch(e){
    //e.printStackTrace()
   }
  }
 currentWriter = null;
 writerThread = null;
}

   }
}

I hope it would help you.

BackgroundThread plugin - has nice example of background worker.


Are you asking about how to configure quartz in a cluster? http://www.quartz-scheduler.org/docs/configuration/ConfigJDBCJobStoreClustering.html

0

精彩评论

暂无评论...
验证码 换一张
取 消