public class AnalyticsDashboard {
// Thread-safe map to track event metrics
private ConcurrentHashMap metricTracker;
public AnalyticsDashboard() {
this.metricTracker = new ConcurrentHashMap<>();
}
// Atomic method to record event
public void recordEvent(String eventType) {
metricTracker.computeIfAbsent(eventType, k -> new LongAdder())
.increment();
}
// Thread-safe method to get event count
public long getEventCount(String eventType) {
LongAdder counter = metricTracker.get(eventType);
return counter != null ? counter.sum() : 0;
}
// Simulate distributed event tracking
public static void main(String[] args) {
AnalyticsDashboard dashboard = new AnalyticsDashboard();
// Different event types
String[] eventTypes = {
"user_login",
"page_view",
"button_click",
"purchase"
};
// Create thread pool for concurrent events
ExecutorService executor = Executors.newFixedThreadPool(10);
Random random = new Random();
// Simulate multiple threads recording events
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
// Randomly select event type
String eventType = eventTypes[random.nextInt(eventTypes.length)];
dashboard.recordEvent(eventType);
});
}
// Shutdown executor and wait for completion
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
// Print final analytics
System.out.println("Analytics Dashboard Report:");
for (String eventType : eventTypes) {
System.out.println(eventType + ": " +
dashboard.getEventCount(eventType) + " events");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class DistributedTaskProcessor {
// Thread-safe blocking queue for task distribution
private BlockingQueue taskQueue;
public DistributedTaskProcessor(int queueCapacity) {
this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);
}
// Submit task to the queue
public void submitTask(Task task) throws InterruptedException {
taskQueue.put(task);
}
// Start worker threads
public void startWorkers(int workerCount) {
for (int i = 0; i < workerCount; i++) {
new Thread(new TaskWorker(), "Worker-" + (i + 1)).start();
}
}
// Worker thread implementation
private class TaskWorker implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// Take task from queue, wait if empty
Task task = taskQueue.take();
processTask(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private void processTask(Task task) {
System.out.println(
Thread.currentThread().getName() +
" processing: " + task
);
try {
// Simulate task processing
Thread.sleep(task.getProcessingTime());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Task model
public static class Task {
private String name;
private long processingTime;
public Task(String name, long processingTime) {
this.name = name;
this.processingTime = processingTime;
}
public long getProcessingTime() { return processingTime; }
@Override
public String toString() {
return "Task{name='" + name + "'}";
}
}
// Demonstration method
public static void main(String[] args) throws InterruptedException {
DistributedTaskProcessor processor =
new DistributedTaskProcessor(10);
// Start 3 worker threads
processor.startWorkers(3);
// Submit tasks
processor.submitTask(new Task("Task-1", 1000));
processor.submitTask(new Task("Task-2", 500));
processor.submitTask(new Task("Task-3", 750));
}
}
public class ConcurrentCacheSystem {
// Thread-safe cache using CopyOnWriteArrayList
private CopyOnWriteArrayList cache;
public ConcurrentCacheSystem() {
this.cache = new CopyOnWriteArrayList<>();
}
// Thread-safe cache entry addition
public void addEntry(String key, String value) {
CacheEntry newEntry = new CacheEntry(key, value);
// Remove existing entry with same key if present
cache.removeIf(entry -> entry.getKey().equals(key));
// Add new entry
cache.add(newEntry);
}
// Thread-safe cache lookup
public Optional getEntry(String key) {
return cache.stream()
.filter(entry -> entry.getKey().equals(key))
.map(CacheEntry::getValue)
.findFirst();
}
// Cache entry model
private static class CacheEntry {
private final String key;
private final String value;
public CacheEntry(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() { return key; }
public String getValue() { return value; }
}
// Demonstration method
public static void main(String[] args) {
ConcurrentCacheSystem cacheSystem = new ConcurrentCacheSystem();
// Create thread pool for concurrent cache operations
ExecutorService executor = Executors.newFixedThreadPool(5);
// Simulate multiple threads adding and reading cache entries
for (int i = 0; i < 10; i++) {
final int threadId = i;
executor.submit(() -> {
String key = "key-" + threadId;
String value = "value-" + threadId;
// Add cache entry
cacheSystem.addEntry(key, value);
// Read cache entry
Optional cachedValue = cacheSystem.getEntry(key);
cachedValue.ifPresent(v ->
System.out.println("Thread " + threadId +
" - Cached: " + key + " = " + v)
);
});
}
// Shutdown executor
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}