public class DataProcessingPipeline {
public List processDataset(List rawDataset) {
// Create an executor service
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
// Prepare processing tasks
List> processingTasks = rawDataset.stream()
.map(this::createProcessingTask)
.collect(Collectors.toList());
try {
// Execute all tasks and collect results
List> futures = executor.invokeAll(processingTasks);
// Collect processed data
List processedDataset = new ArrayList<>();
for (Future future : futures) {
processedDataset.add(future.get());
}
return processedDataset;
} catch (InterruptedException | ExecutionException e) {
// Handle exceptions
throw new RuntimeException("Data processing failed", e);
} finally {
// Shutdown the executor
executor.shutdown();
}
}
private Callable createProcessingTask(RawData rawData) {
return () -> {
// Simulate complex data processing
return new ProcessedData(
rawData.getId(),
transformData(rawData.getValue()),
calculateComplexity(rawData)
);
};
}
private String transformData(String input) {
// Simulate data transformation
return input.toUpperCase();
}
private int calculateComplexity(RawData rawData) {
return rawData.getValue().length();
}
// Data model classes
public static class RawData {
private String id;
private String value;
public RawData(String id, String value) {
this.id = id;
this.value = value;
}
public String getId() { return id; }
public String getValue() { return value; }
}
public static class ProcessedData {
private String id;
private String processedValue;
private int complexity;
public ProcessedData(String id, String processedValue, int complexity) {
this.id = id;
this.processedValue = processedValue;
this.complexity = complexity;
}
@Override
public String toString() {
return "ProcessedData{" +
"id='" + id + '\'' +
", processedValue='" + processedValue + '\'' +
", complexity=" + complexity +
'}';
}
}
}
public class WebCrawler {
private final ExecutorService executor;
private final int timeoutSeconds;
public WebCrawler(int threadCount, int timeoutSeconds) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.timeoutSeconds = timeoutSeconds;
}
public List crawlUrls(List urls) {
// Prepare URL fetching tasks
List> crawlingTasks = urls.stream()
.map(this::createCrawlingTask)
.collect(Collectors.toList());
List crawledPages = new ArrayList<>();
try {
// Execute tasks with timeout
List> futures = executor.invokeAll(
crawlingTasks,
timeoutSeconds,
TimeUnit.SECONDS
);
// Collect successful crawl results
for (Future future : futures) {
try {
if (future.isDone() && !future.isCancelled()) {
crawledPages.add(future.get());
}
} catch (ExecutionException e) {
// Log or handle individual task failures
System.err.println("Crawling failed: " + e.getMessage());
}
}
return crawledPages;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Web crawling interrupted", e);
} finally {
executor.shutdown();
}
}
private Callable createCrawlingTask(String url) {
return () -> {
try {
// Simulate URL fetching
return fetchWebPage(url);
} catch (Exception e) {
// Handle individual URL crawling errors
System.err.println("Error crawling " + url + ": " + e.getMessage());
throw e;
}
};
}
private WebPage fetchWebPage(String url) throws Exception {
// Simulate web page fetching with potential delay
Thread.sleep((long) (Math.random() * 3000));
return new WebPage(
url,
"Content of " + url,
System.currentTimeMillis()
);
}
// Web Page model
public static class WebPage {
private String url;
private String content;
private long fetchTime;
public WebPage(String url, String content, long fetchTime) {
this.url = url;
this.content = content;
this.fetchTime = fetchTime;
}
@Override
public String toString() {
return "WebPage{" +
"url='" + url + '\'' +
", contentLength=" + content.length() +
", fetchTime=" + fetchTime +
'}';
}
}
}
public class BatchImageProcessor {
public List processImages(List images) {
// Create an executor service
ExecutorService executor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
try {
// Prepare image processing tasks
List> processingTasks = images.stream()
.map(this::createImageProcessingTask)
.collect(Collectors.toList());
// Execute all tasks
List> futures = executor.invokeAll(processingTasks);
// Collect processed images
List processedImages = new ArrayList<>();
for (Future future : futures) {
try {
processedImages.add(future.get());
} catch (ExecutionException e) {
// Handle individual image processing errors
System.err.println("Image processing failed: " + e.getMessage());
}
}
return processedImages;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Image processing interrupted", e);
} finally {
executor.shutdown();
}
}
private Callable createImageProcessingTask(BufferedImage image) {
return () -> {
// Simulate image processing
BufferedImage processedImage = new BufferedImage(
image.getWidth(),
image.getHeight(),
BufferedImage.TYPE_INT_RGB
);
for (int y = 0; y < image.getHeight(); y++) {
for (int x = 0; x < image.getWidth(); x++) {
int rgb = image.getRGB(x, y);
int processedRgb = processPixel(rgb);
processedImage.setRGB(x, y, processedRgb);
}
}
return new ProcessedImage(
processedImage,
calculateImageComplexity(image)
);
};
}
private int processPixel(int rgb) {
// Simulate image processing (e.g., grayscale conversion)
int r = (rgb >> 16) & 0xFF;
int g = (rgb >> 8) & 0xFF;
int b = rgb & 0xFF;
int grayValue = (r + g + b) / 3;
return (grayValue << 16) | (grayValue << 8) | grayValue;
}
private int calculateImageComplexity(BufferedImage image) {
// Simulate complexity calculation
return image.getWidth() * image.getHeight();
}
// Processed Image model
public static class ProcessedImage {
private BufferedImage image;
private int complexity;
public ProcessedImage(BufferedImage image, int complexity) {
this.image = image;
this.complexity = complexity;
}
@Override
public String toString() {
return "ProcessedImage{" +
"imageSize=" + image.getWidth() + "x" + image.getHeight() +
", complexity=" + complexity +
'}';
}
}
}