Let's say I have to read from a directory that has many large XML files in it, and I have to parse that and send them to some service via network, and then write the response to disk again.
If it were Java or C++ etc., I may do something like this (hope this makes sense):
(File read & xml parsing process) -> bounded-queue -> (sender process) -> service
service -> bounded-queue -> (process to parse result and write to disk)
And then I'd assign whatever suitable number of threads to each process. This way I can limit the concurrency of each process at its optimal value, and the bounded queue will ensure there won't be memory shortage etc.
What should I do though when coding in Erlang? I guess I could just implement the whole flow in a function, then iterate the directory and spawn 开发者_如何学Cthese "start-to-end" processes as fast as possible. This sounds suboptimal though because if parsing of XML takes longer than reading the files etc. the app. could go into memory shortage for having many XML documents in-memory at once etc., and you can't keep the concurrency at the optimal level. E.g. if the "service" is most efficient when concurrency is 4, it would be very inefficient to hit it with enormous concurrency.
How should erlang programmers deal with such situation? I.e. what is the erlang substitute for fixed thread pool and bounded queue?
There is no real way to limit the queue sizes of a process except by handling them all in a timely fashion. Best way would be to simply check available resources before spawning and wait if they are insufficient. So if you are worried about memory, check memory before spawning a new process. if discspace, check diskspace, ect.
Limiting the number of processes spawned is also possible. A simple construction would be:
pool(Max) ->
process_flag(trap_exit, true),
pool(0, Max);
pool(Current, Max) ->
receive
{'EXIT', _, _} ->
pool(Current - 1, Max);
{ work, F, Pid} when Current < Max ->
Pid ! accepted,
spawn_link(F),
pool(Current + 1, Max);
{ work, _, Pid} ->
Pid ! rejected,
pool(Current, Max);
end.
This is a rough sketch how a process would limit the number of processes it spawns. It is however considered better to limit on the real reasons instead of an artificial number.
You can definitely run your own process pool in Erlang, but it is a poor way memory usage since it doesn't take into account the size of the XML data being read (or the total memory used by the processes for that matter).
I would suggest implementing the whole workflow in a functional library, as you suggested, and spawn processes that execute this workflow. Add a check for memory usage which will look at the size of the data to be read in and the available memory (hint: use memsup).
I would suggest you do it in event-driven paradigm.
Imagine you started OTP gen_server with the list of file names.
gen_servers checks resources and spawns next worker if permitted, removing file name from the list and passing it to worker.
Worker processes file and casts message back to gen_server when ready (or you can just trap EXIT).
gen_server receives such message and performs step 1 until file list is empty.
So workers do the heavy lifting, gen_server controls the flow.
You can also create distributed system, but it's a bit more complex as you need to spawn intermediate gen_servers on each computer and query them if resources are available there and then choose which computer should process next file based on replies. And you probably need something like NFS to avoid sending long messages.
Workers can be further split if you need more concurrency.
精彩评论