Initial commit
This commit is contained in:
5
src/main/java/fr/istic/date/EnvoyerDate.java
Normal file
5
src/main/java/fr/istic/date/EnvoyerDate.java
Normal file
@@ -0,0 +1,5 @@
|
||||
package fr.istic.date;
|
||||
|
||||
public class EnvoyerDate {
|
||||
|
||||
}
|
||||
5
src/main/java/fr/istic/date/RecevoirDate.java
Normal file
5
src/main/java/fr/istic/date/RecevoirDate.java
Normal file
@@ -0,0 +1,5 @@
|
||||
package fr.istic.date;
|
||||
|
||||
public class RecevoirDate {
|
||||
|
||||
}
|
||||
28
src/main/java/rabbitmq/tuto1/Recv.java
Normal file
28
src/main/java/rabbitmq/tuto1/Recv.java
Normal file
@@ -0,0 +1,28 @@
|
||||
package rabbitmq.tuto1;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
|
||||
public class Recv {
|
||||
|
||||
private final static String QUEUE_NAME = "hello";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
System.out.println(" [x] Received '" + message + "'");
|
||||
};
|
||||
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
|
||||
});
|
||||
}
|
||||
}
|
||||
22
src/main/java/rabbitmq/tuto1/Send.java
Normal file
22
src/main/java/rabbitmq/tuto1/Send.java
Normal file
@@ -0,0 +1,22 @@
|
||||
package rabbitmq.tuto1;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
public class Send {
|
||||
|
||||
private final static String QUEUE_NAME = "hello";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
String message = "Hello World!";
|
||||
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
|
||||
System.out.println(" [x] Sent '" + message + "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
27
src/main/java/rabbitmq/tuto2/NewTask.java
Normal file
27
src/main/java/rabbitmq/tuto2/NewTask.java
Normal file
@@ -0,0 +1,27 @@
|
||||
package rabbitmq.tuto2;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.MessageProperties;
|
||||
|
||||
public class NewTask {
|
||||
|
||||
private static final String TASK_QUEUE_NAME = "task_queue";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
||||
|
||||
String message = String.join(" ", argv);
|
||||
|
||||
channel.basicPublish("", TASK_QUEUE_NAME,
|
||||
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
|
||||
System.out.println(" [x] Sent '" + message + "'");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
50
src/main/java/rabbitmq/tuto2/Worker.java
Normal file
50
src/main/java/rabbitmq/tuto2/Worker.java
Normal file
@@ -0,0 +1,50 @@
|
||||
package rabbitmq.tuto2;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
|
||||
public class Worker {
|
||||
|
||||
private static final String TASK_QUEUE_NAME = "task_queue";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
final Connection connection = factory.newConnection();
|
||||
final Channel channel = connection.createChannel();
|
||||
|
||||
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
|
||||
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
|
||||
channel.basicQos(1);
|
||||
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
|
||||
System.out.println(" [x] Received '" + message + "'");
|
||||
try {
|
||||
doWork(message);
|
||||
} finally {
|
||||
System.out.println(" [x] Done");
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
}
|
||||
};
|
||||
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback,
|
||||
consumerTag -> {
|
||||
});
|
||||
}
|
||||
|
||||
private static void doWork(String task) {
|
||||
for (char ch : task.toCharArray()) {
|
||||
if (ch == '.') {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException _ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
26
src/main/java/rabbitmq/tuto3/EmitLog.java
Normal file
26
src/main/java/rabbitmq/tuto3/EmitLog.java
Normal file
@@ -0,0 +1,26 @@
|
||||
package rabbitmq.tuto3;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
public class EmitLog {
|
||||
|
||||
private static final String EXCHANGE_NAME = "logs";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
||||
|
||||
String message = argv.length < 1 ? "info: Hello World!"
|
||||
: String.join(" ", argv);
|
||||
|
||||
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
|
||||
System.out.println(" [x] Sent '" + message + "'");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
30
src/main/java/rabbitmq/tuto3/ReceiveLogs.java
Normal file
30
src/main/java/rabbitmq/tuto3/ReceiveLogs.java
Normal file
@@ -0,0 +1,30 @@
|
||||
package rabbitmq.tuto3;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
|
||||
public class ReceiveLogs {
|
||||
private static final String EXCHANGE_NAME = "logs";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
||||
String queueName = channel.queueDeclare().getQueue();
|
||||
channel.queueBind(queueName, EXCHANGE_NAME, "");
|
||||
|
||||
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
System.out.println(" [x] Received '" + message + "'");
|
||||
};
|
||||
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
|
||||
});
|
||||
}
|
||||
}
|
||||
57
src/main/java/rabbitmq/tuto4/EmitLogDirect.java
Normal file
57
src/main/java/rabbitmq/tuto4/EmitLogDirect.java
Normal file
@@ -0,0 +1,57 @@
|
||||
package rabbitmq.tuto4;
|
||||
|
||||
import com.rabbitmq.client.BuiltinExchangeType;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
public class EmitLogDirect {
|
||||
|
||||
private static final String EXCHANGE_NAME = "direct_logs";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
|
||||
|
||||
String severity = getSeverity(argv);
|
||||
String message = getMessage(argv);
|
||||
|
||||
channel.basicPublish(EXCHANGE_NAME, severity, null,
|
||||
message.getBytes("UTF-8"));
|
||||
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
|
||||
}
|
||||
}
|
||||
|
||||
private static String getSeverity(String[] strings) {
|
||||
if (strings.length < 1) {
|
||||
return "info";
|
||||
}
|
||||
return strings[0];
|
||||
}
|
||||
|
||||
private static String getMessage(String[] strings) {
|
||||
if (strings.length < 2) {
|
||||
return "Hello World!";
|
||||
}
|
||||
return joinStrings(strings, " ", 1);
|
||||
}
|
||||
|
||||
private static String joinStrings(String[] strings, String delimiter,
|
||||
int startIndex) {
|
||||
int length = strings.length;
|
||||
if (length == 0) {
|
||||
return "";
|
||||
}
|
||||
if (length < startIndex) {
|
||||
return "";
|
||||
}
|
||||
StringBuilder words = new StringBuilder(strings[startIndex]);
|
||||
for (int i = startIndex + 1; i < length; i++) {
|
||||
words.append(delimiter).append(strings[i]);
|
||||
}
|
||||
return words.toString();
|
||||
}
|
||||
}
|
||||
36
src/main/java/rabbitmq/tuto4/ReceiveLogsDirect.java
Normal file
36
src/main/java/rabbitmq/tuto4/ReceiveLogsDirect.java
Normal file
@@ -0,0 +1,36 @@
|
||||
package rabbitmq.tuto4;
|
||||
|
||||
import com.rabbitmq.client.*;
|
||||
|
||||
public class ReceiveLogsDirect {
|
||||
|
||||
private static final String EXCHANGE_NAME = "direct_logs";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
|
||||
String queueName = channel.queueDeclare().getQueue();
|
||||
|
||||
if (argv.length < 1) {
|
||||
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
for (String severity : argv) {
|
||||
channel.queueBind(queueName, EXCHANGE_NAME, severity);
|
||||
}
|
||||
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
System.out.println(" [x] Received '"
|
||||
+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
|
||||
};
|
||||
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
|
||||
});
|
||||
}
|
||||
}
|
||||
57
src/main/java/rabbitmq/tuto5/EmitLogTopic.java
Normal file
57
src/main/java/rabbitmq/tuto5/EmitLogTopic.java
Normal file
@@ -0,0 +1,57 @@
|
||||
package rabbitmq.tuto5;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
public class EmitLogTopic {
|
||||
|
||||
private static final String EXCHANGE_NAME = "topic_logs";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
|
||||
|
||||
String routingKey = getRouting(argv);
|
||||
String message = getMessage(argv);
|
||||
|
||||
channel.basicPublish(EXCHANGE_NAME, routingKey, null,
|
||||
message.getBytes("UTF-8"));
|
||||
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
|
||||
}
|
||||
}
|
||||
|
||||
private static String getRouting(String[] strings) {
|
||||
if (strings.length < 1) {
|
||||
return "anonymous.info";
|
||||
}
|
||||
return strings[0];
|
||||
}
|
||||
|
||||
private static String getMessage(String[] strings) {
|
||||
if (strings.length < 2) {
|
||||
return "Hello World!";
|
||||
}
|
||||
return joinStrings(strings, " ", 1);
|
||||
}
|
||||
|
||||
private static String joinStrings(String[] strings, String delimiter,
|
||||
int startIndex) {
|
||||
int length = strings.length;
|
||||
if (length == 0) {
|
||||
return "";
|
||||
}
|
||||
if (length < startIndex) {
|
||||
return "";
|
||||
}
|
||||
StringBuilder words = new StringBuilder(strings[startIndex]);
|
||||
for (int i = startIndex + 1; i < length; i++) {
|
||||
words.append(delimiter).append(strings[i]);
|
||||
}
|
||||
return words.toString();
|
||||
}
|
||||
}
|
||||
40
src/main/java/rabbitmq/tuto5/ReceiveLogsTopic.java
Normal file
40
src/main/java/rabbitmq/tuto5/ReceiveLogsTopic.java
Normal file
@@ -0,0 +1,40 @@
|
||||
package rabbitmq.tuto5;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
|
||||
public class ReceiveLogsTopic {
|
||||
|
||||
private static final String EXCHANGE_NAME = "topic_logs";
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel();
|
||||
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
|
||||
String queueName = channel.queueDeclare().getQueue();
|
||||
|
||||
if (argv.length < 1) {
|
||||
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
for (String bindingKey : argv) {
|
||||
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
|
||||
}
|
||||
|
||||
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
|
||||
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
System.out.println(" [x] Received '"
|
||||
+ delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
|
||||
};
|
||||
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
|
||||
});
|
||||
}
|
||||
}
|
||||
70
src/main/java/rabbitmq/tuto6/RPCClient.java
Normal file
70
src/main/java/rabbitmq/tuto6/RPCClient.java
Normal file
@@ -0,0 +1,70 @@
|
||||
package rabbitmq.tuto6;
|
||||
|
||||
import com.rabbitmq.client.AMQP;
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class RPCClient implements AutoCloseable {
|
||||
|
||||
private Connection connection;
|
||||
private Channel channel;
|
||||
private String requestQueueName = "rpc_queue";
|
||||
|
||||
public RPCClient() throws IOException, TimeoutException {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
|
||||
connection = factory.newConnection();
|
||||
channel = connection.createChannel();
|
||||
}
|
||||
|
||||
public static void main(String[] argv) {
|
||||
try (RPCClient fibonacciRpc = new RPCClient()) {
|
||||
for (int i = 0; i < 32; i++) {
|
||||
String i_str = Integer.toString(i);
|
||||
System.out.println(" [x] Requesting fib(" + i_str + ")");
|
||||
String response = fibonacciRpc.call(i_str);
|
||||
System.out.println(" [.] Got '" + response + "'");
|
||||
}
|
||||
} catch (IOException | TimeoutException | InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public String call(String message) throws IOException, InterruptedException {
|
||||
final String corrId = UUID.randomUUID().toString();
|
||||
|
||||
String replyQueueName = channel.queueDeclare().getQueue();
|
||||
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
|
||||
.correlationId(corrId).replyTo(replyQueueName).build();
|
||||
|
||||
channel.basicPublish("", requestQueueName, props,
|
||||
message.getBytes("UTF-8"));
|
||||
|
||||
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
|
||||
|
||||
String ctag = channel.basicConsume(replyQueueName, true,
|
||||
(consumerTag, delivery) -> {
|
||||
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
|
||||
response.offer(new String(delivery.getBody(), "UTF-8"));
|
||||
}
|
||||
}, consumerTag -> {
|
||||
});
|
||||
|
||||
String result = response.take();
|
||||
channel.basicCancel(ctag);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
74
src/main/java/rabbitmq/tuto6/RPCServer.java
Normal file
74
src/main/java/rabbitmq/tuto6/RPCServer.java
Normal file
@@ -0,0 +1,74 @@
|
||||
package rabbitmq.tuto6;
|
||||
|
||||
import com.rabbitmq.client.*;
|
||||
|
||||
public class RPCServer {
|
||||
|
||||
private static final String RPC_QUEUE_NAME = "rpc_queue";
|
||||
|
||||
private static int fib(int n) {
|
||||
if (n == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (n == 1) {
|
||||
return 1;
|
||||
}
|
||||
return fib(n - 1) + fib(n - 2);
|
||||
}
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
|
||||
channel.queuePurge(RPC_QUEUE_NAME);
|
||||
|
||||
channel.basicQos(1);
|
||||
|
||||
System.out.println(" [x] Awaiting RPC requests");
|
||||
|
||||
Object monitor = new Object();
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
|
||||
.correlationId(delivery.getProperties().getCorrelationId()).build();
|
||||
|
||||
String response = "";
|
||||
|
||||
try {
|
||||
String message = new String(delivery.getBody(), "UTF-8");
|
||||
int n = Integer.parseInt(message);
|
||||
|
||||
System.out.println(" [.] fib(" + message + ")");
|
||||
response += fib(n);
|
||||
} catch (RuntimeException e) {
|
||||
System.out.println(" [.] " + e.toString());
|
||||
} finally {
|
||||
channel.basicPublish("", delivery.getProperties().getReplyTo(),
|
||||
replyProps, response.getBytes("UTF-8"));
|
||||
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
|
||||
// RabbitMq consumer worker thread notifies the RPC server owner
|
||||
// thread
|
||||
synchronized (monitor) {
|
||||
monitor.notify();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback,
|
||||
(consumerTag -> {
|
||||
}));
|
||||
// Wait and be prepared to consume the message from RPC client.
|
||||
while (true) {
|
||||
synchronized (monitor) {
|
||||
try {
|
||||
monitor.wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user