Пожалуйста помогите реализовать broadcast join на хадупе!

Пожалуйста помогите реализовать broadcast join на хадупе!

Обычно для реализации такого джойна используют распределенный кеш: кладем в кеш меньшую таблицу, а затем на каждом маппере считываем всю таблицу в хэш мап и затем при обработке просто ищем нужную запись в мапе

Например, у нас есть две таблицы authors и books, причем authors меньше и помещается в память. Кладем ее в распределенный кеш с помощью класса DistributedCache:

...
DistributedCache.addCacheFile(authors.toUri(), conf);

Затем в маппере считываем таблицу из кеша:

private final Map<String, String> index = new HashMap<String, String>();

@Override
protected void setup(Context ctx) throws IOException,
        InterruptedException {
    super.setup(ctx);
    Configuration config = ctx.getConfiguration();
    FileSystem fs = FileSystem.get(config);

    Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(config);
    for (Path path : cacheFilesLocal) {
        processAuthors(fs, path);
    }
}

В processAuthors мы просто кладем все содержимое файла в мап (файлов может быть несколько)

private void processAuthors(FileSystem fs, Path path) throws IOException {
    FSDataInputStream is = fs.open(path);
    List<String> lines = IOUtils.readLines(is);
    for (String line : lines) {
        String[] split = line.split("\t");
        index.put(split[0], split[1]);
    }
}

В методе map мы просто проверяем, есть ли запись в мапе index, и если есть, достаем ее оттуда:

@Override
protected void map(Object key, Text line, Context ctx) throws IOException, InterruptedException {
    String stringLine = line.toString();
    String[] split = stringLine.split("\t");

    String id = split[0];
    if (!index.containsKey(id)) {
        return;
    }

    String author = index.get(id);
    String bookTitle = split[2];
    String year = split[1];

    ctx.write(new Text(author), new Text(bookTitle + '\t' + year));
}

Если интересует полный пример, можно посмотреть тут

Обычно для реализации такого джойна используют распределенный кеш: кладем в кеш меньшую таблицу, а затем на каждом маппере считываем всю таблицу в хэш мап и затем при обработке просто ищем нужную запись в мапе Например, у нас есть две таблицы authors и books, причем authors меньше и помещается в память. Кладем ее в распределенный кеш с помощью класса `DistributedCache`: ``` ... DistributedCache.addCacheFile(authors.toUri(), conf); ``` Затем в маппере считываем таблицу из кеша: private final Map&lt;String, String&gt; index = new HashMap&lt;String, String&gt;(); @Override protected void setup(Context ctx) throws IOException, InterruptedException { super.setup(ctx); Configuration config = ctx.getConfiguration(); FileSystem fs = FileSystem.get(config); Path[] cacheFilesLocal = DistributedCache.getLocalCacheFiles(config); for (Path path : cacheFilesLocal) { processAuthors(fs, path); } } В `processAuthors` мы просто кладем все содержимое файла в мап (файлов может быть несколько) private void processAuthors(FileSystem fs, Path path) throws IOException { FSDataInputStream is = fs.open(path); List&lt;String&gt; lines = IOUtils.readLines(is); for (String line : lines) { String[] split = line.split("\t"); index.put(split[0], split[1]); } } В методе `map` мы просто проверяем, есть ли запись в мапе `index`, и если есть, достаем ее оттуда: @Override protected void map(Object key, Text line, Context ctx) throws IOException, InterruptedException { String stringLine = line.toString(); String[] split = stringLine.split("\t"); String id = split[0]; if (!index.containsKey(id)) { return; } String author = index.get(id); String bookTitle = split[2]; String year = split[1]; ctx.write(new Text(author), new Text(bookTitle + '\t' + year)); } Если интересует полный пример, можно посмотреть [тут](https://github.com/alexeygrigorev/aim3/blob/master/src/main/java/de/tuberlin/dima/aim3/assignment1/BookAndAuthorBroadcastJoin.java "тут")
88
просмотров
1
ответов
2
подписчики
Предпросмотр
введите как минимим 10 characters
WARNING: You mentioned %MENTIONS%, but they cannot see this message and will not be notified
Сохраняю...
Сохранено
Все темы будут удалено ?
Сохранены неопубликованные черновики. Нажмите для продолжения редактирования
Discard draft