package com.jzmq.test;
/**
* @author Mariusz Ryndzionek
* @email mryndzionek@gmail.com
*
* Least-recently used (LRU) queue device
* Clients and workers are shown here in-process
*
* While this example runs in a single process, that is just to make
* it easier to start and stop the example. Each thread has its own
* context and conceptually acts as a separate process.
*
**/
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
class ClientThread extends Thread
{
public void run()
{
Context context = ZMQ.context(1);
// Prepare our context and sockets
Socket client = context.socket(ZMQ.REQ);
// Initialize random number generator
Random srandom = new Random(System.nanoTime());
String id = String.format("%04x-%04x", srandom.nextInt(0x10000)+1,srandom.nextInt(0x10000)+1);
client.setIdentity(id.getBytes());
client.connect("ipc://frontend.ipc");
// Send request, get reply
client.send("HELLO".getBytes(), 0);
String reply = new String(client.recv(0));
System.out.println("Client: " + reply);
}
}
class WorkerThread extends Thread
{
public void run()
{
Context context = ZMQ.context(1);
// Prepare our context and sockets
Socket worker = context.socket(ZMQ.REQ);
// Initialize random number generator
Random srandom = new Random(System.nanoTime());
String id = String.format("%04x-%04x", srandom.nextInt(0x10000)+1,srandom.nextInt(0x10000)+1);
worker.setIdentity(id.getBytes()); // Makes tracing easier
worker.connect("ipc://backend.ipc");
// Tell backend we're ready for work
worker.send("READY".getBytes(), 0);
while(true)
{
String address = new String(worker.recv(0));
String empty = new String(worker.recv(0));
assert empty.length()==0 | true;
// Get request, send reply
String request = new String(worker.recv(0));
System.out.println("Worker: " + request);
worker.send(address.getBytes(), ZMQ.SNDMORE);
worker.send("".getBytes(), ZMQ.SNDMORE);
worker.send("OK".getBytes(), 0);
}
}
}
public class lruqueue {
public static void main(String[] args) {
Context context = ZMQ.context(1);
// Prepare our context and sockets
Socket frontend = context.socket(ZMQ.XREP);
Socket backend = context.socket(ZMQ.XREP);
frontend.bind("ipc://frontend.ipc");
backend.bind("ipc://backend.ipc");
int client_nbr;
for (client_nbr = 0; client_nbr < 10; client_nbr++)
new ClientThread().start();
int worker_nbr;
for (worker_nbr = 0; worker_nbr < 3; worker_nbr++)
new WorkerThread().start();
// Logic of LRU loop
// - Poll backend always, frontend only if 1+ worker ready
// - If worker replies, queue worker as ready and forward reply
// to client if necessary
// - If client requests, pop next worker and send request to it
//
// A very simple queue structure with known max size
Queue<String> worker_queue = new LinkedList<String>();
while (!Thread.currentThread().isInterrupted()) {
// Initialize poll set
Poller items = context.poller(2);
//ââ?Alwaysâpollâforâworkerâactivityâonâbackend
items.register(backend, Poller.POLLIN);
//ââ?Pollâfront-endâonlyâifâweâhaveâavailableâworkers
if(worker_queue.size()>0)
items.register(frontend, Poller.POLLIN);
items.poll();
// Handle worker activity on backend
if (items.pollin(0)) {
// Queue worker address for LRU routing
worker_queue.add(new String(backend.recv(0)));
// Second frame is empty
String empty = new String(backend.recv(0));
assert empty.length()==0 | true;
// Third frame is READY or else a client reply address
String client_addr = new String(backend.recv(0));
// If client reply, send rest back to frontend
if (!client_addr.equals("READY")) {
empty = new String(backend.recv(0));
assert empty.length()==0 | true;
String reply = new String(backend.recv(0));
frontend.send(client_addr.getBytes(), ZMQ.SNDMORE);
frontend.send("".getBytes(), ZMQ.SNDMORE);
frontend.send(reply.getBytes(), 0);
if (--client_nbr == 0)
break;
}
}
if (items.pollin(1)) {
// Now get next client request, route to LRU worker
// Client request is [address][empty][request]
String client_addr = new String(frontend.recv(0));
String empty = new String(frontend.recv(0));
assert empty.length()==0 | true;
String request = new String(frontend.recv(0));
String worker_addr = worker_queue.poll();//worker_queue [0];
backend.send(worker_addr.getBytes(), ZMQ.SNDMORE);
backend.send("".getBytes(), ZMQ.SNDMORE);
backend.send(client_addr.getBytes(), ZMQ.SNDMORE);
backend.send("".getBytes(), ZMQ.SNDMORE);
backend.send(request.getBytes(), 0);
}
}
frontend.close();
backend.close();
context.term();
System.exit(0);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
zeromqTest.rar (64个子文件)
zeromqTest
.project 386B
src
com
jzmq
test
taskwork.java 1KB
syncsub.java 1KB
lpclient.java 3KB
msreader.java 1KB
rtdealer.java 3KB
durapub.java 877B
tasksink.java 1KB
syncpub.java 1KB
rrclient.java 922B
wuserver.java 1014B
msgqueue.java 853B
rrbroker.java 2KB
version.java 321B
hwserver.java 2KB
lruqueue.java 5KB
taskwork2.java 1KB
mtrelay.java 2KB
durapub2.java 1010B
lpserver.java 2KB
taskvent.java 2KB
psenvsub.java 635B
durasub.java 902B
wuproxy.java 1KB
rrserver.java 1KB
mspoller.java 1KB
wuclient.java 1KB
hwclient.java 2KB
psenvpub.java 727B
lib
.settings
org.eclipse.jdt.core.prefs 629B
.classpath 367B
bin
com
jzmq
test
rtdealer.class 2KB
rrserver.class 2KB
wuserver.class 1KB
tasksink.class 2KB
psenvpub.class 1KB
taskwork2.class 2KB
durapub2.class 1KB
ClientThread.class 2KB
hwclient.class 2KB
psenvsub.class 1KB
version.class 884B
taskwork.class 2KB
mspoller.class 1KB
taskvent.class 2KB
rrbroker.class 2KB
syncpub.class 2KB
syncsub.class 2KB
mtrelay$1.class 1KB
msgqueue.class 1KB
durapub.class 1KB
rrclient.class 2KB
lpserver.class 2KB
lpclient.class 3KB
wuclient.class 2KB
msreader.class 1KB
rtdealer$Worker.class 2KB
hwserver.class 2KB
WorkerThread.class 2KB
mtrelay.class 1KB
wuproxy.class 1KB
mtrelay$1$1.class 1016B
lruqueue.class 3KB
durasub.class 1KB
共 64 条
- 1
资源评论
- sweet6hero2014-09-29适合初学者。多谢共享!
- mytechssl2013-01-16鸡肋,没有什么用
- husong_1232015-01-14例子很好,浅显易懂适合初学者。
- conan81262013-05-13比较简单的例子,适合初学者。多谢共享!
wh351531104
- 粉丝: 18
- 资源: 68
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功