websitelytics

Menu

Автоматическая загрузка json данных из файла с помощью Cloud Functions в BigQuery

Опубликовано: 07 июл 2019

Предположим, что вам нужно регулярно, например на ежедневной основе, загружать какие-то данные в BigQuery, и вы хотите сделать этот процесс автоматическим. Один из способов решения данной задачи — это автоматический экспорт данных json (новая строка разделитель) при загрузке соответствующего файла в Google Cloud Storage в какую-нибудь определенную папку вашего проекта с помощью Cloud Functions.

Хороший юскейс для подобной задачи - это построение Сквозной аналитики на базе BigQuery

Особенность Cloud Functions заключается в том, что это в некотором смысле квинтэссенция serverless-решений, где для выполнения определенных полезных действий по выбранному вами триггеру не нужен ни север, ни API, ни процесс авторизации, если все происходит в рамках одного проекта, ни секретные ключи, т. е. в общем-то ничего, кроме нескольких строчек кода самой функции, и пары строчек кода в файле конфигурации зависимостей package.json.

В данном примере, мы напишем функцию на javascript для окружения Node.js, но последовательность действий будет общая и для других языков и окружений.

1. В выбранном или новом проекте в Google Cloud Platform включем BigQuery API и Functions API для данного проекта.

Если ваш проект, к примеру, называется jsonuploading, то это можно будет сделать по ссылке console.cloud.google.com/apis/dashboard?project=jsonuploading

2. Создаем в Google Cloud Storage для проекта новый Bucket.

Затем соответственно вам нужно в Google Cloud Storage для этого же проекта создать bucket, т. е. папку, в которую вы будете загружать json файлы, и которая будет привязана в вашей функции. Сделать это можно по ссылке console.cloud.google.com/storage/create-bucket?project=jsonuploading

При конфигурировании новой папки для параметра storage class можно выбрать Multi-Regional, а для Access control model соответственно Set permissions uniformly at bucket-level.

3. Создаем Dataset

В BigQuery в выбранном проекте создаем, если еще не сделали этого, новый Dataset в который и будут добавляться таблицы с данными из json-файлов.

Чтобы можно было строить запросы по всем добавляемым таблицам, мы будем все файлы именовать одинаково, меняя в названии только цифры даты за которую, к примеру, создан файл. Т.е. файлы могут называться так: orders_20190701.json, orders_20190702.json, orders_20190703.json и т. п. Таблицы, создаваемые в BigQuery для этих файлов будут копировать название из загружаемого файла.

4. Переходим к созданию самой функции.

Заходим в интерфейс Cloud Functions в нашем проекте по ссылке console.cloud.google.com/functions/list?project=jsonuploading и нажимаем кнопку Create Function.

В открывшемся интерфейсе мы можем создавать и редактировать наши функции. Задаем имя функции, а в качестве триггера и события выбираем соответственно Cloud Storage и Finalize/Create, поскольку мы хотим вызывать функцию по факту загрузки файла в Cloud Storage.

Во вкладке внутреннего редакторы index.js мы будем писать код нашей функции, а во вкладке pakage.json мы укажем зависимости. Наша исполняемая функция будет называться bqload, что также нужно указать в соответствующем поле Function to execute, расположенном ниже.

Первый блок кода нашей функции — это импорт необходимых модулей:

const assert = require('assert');
const path = require('path');
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');
const bigquery = new BigQuery();
const storage = new Storage({
    projectId: 'jsonsuploading',
  });

Далее мы задаем схему создаваемых таблиц в BigQuery, которая естественно должна соответствовать данным в json для загрузки.

В качестве тестовых данных мы будем использовать такой файл:

{"orderID":"76755755","custID":"394834","custType":"WIP","location":"Moscow","date":"2018-12-16","purchasedItems":[{"sku":"product1","description":"Суаер товар номер один","quantity":2,"price":290},{"sku":"product2","description":"Пример другого товара","quantity":3,"price":230}]}
{"orderID":"76755756","custID":"195678","custType":"Ordinary","location":"SanktPeterburg","date":"2018-12-16","purchasedItems":[{"sku":"product3","description":"Третий товар","quantity":10,"price":9},{"sku":"product7","description":"Некий товар под номером семь","quantity":3,"price":130}]}

Скачать файл

Две строчки для примера будет достаточно, но естественно файл может быть практическе насколько угодно большим. Об ограничениях можно почитать здесь - Loading JSON data from Cloud Storage

Схема, которую мы зададим в нашем коде для такой структуры json-данных будет следующей:

const tableschema = [
  {
      "name": "date",
      "type": "DATE",
      "mode": "NULLABLE"
  },
  {
      "name": "location",
      "type": "STRING",
      "mode": "NULLABLE"
  },
  {
      "name": "purchasedItems",
      "type": "RECORD",
      "mode": "REPEATED",
      "fields": [
          {
              "name": "price",
              "type": "INTEGER",
              "mode": "NULLABLE"
          },
          {
              "name": "quantity",
              "type": "INTEGER",
              "mode": "NULLABLE"
          },
          {
              "name": "description",
              "type": "STRING",
              "mode": "NULLABLE"
          },
          {
              "name": "sku",
              "type": "STRING",
              "mode": "NULLABLE"
          }
      ]
  },
  {
      "name": "custType",
      "type": "STRING",
      "mode": "NULLABLE"
  },
  {
      "name": "custID",
      "type": "STRING",
      "mode": "NULLABLE"
  },
  {
      "name": "orderID",
      "type": "STRING",
      "mode": "NULLABLE"
  }
];

Теперь сама исполняемая функция. Смотрите комментарии в коде.

exports.bqload = (event) => {
  const file = event;
  console.log(`File ${file.name} start procesing.`);
  const datasetId = 'uploading';
  const tableId = path.parse(file.name).name;
  const ext = path.parse(file.name).ext
  const jobMetadata = {
    schema: {
      fields: tableschema
    },
    sourceFormat : 'NEWLINE_DELIMITED_JSON',  // указываем какой формат файла для импорта
    createDisposition : 'CREATE_IF_NEEDED', // если таблицы с таким названием нет, создаем новую
    writeDisposition: 'WRITE_TRUNCATE' // если таблица существует, перезаписываем данные
  };

  if (ext == '.json') {
    return bigquery
        .dataset(datasetId)
        .table(tableId)
        .load(storage.bucket(file.bucket).file(file.name), jobMetadata)
        .then(results => {
            const job = results[0];

            // load() waits for the job to finish
            assert.equal(job.status.state, 'DONE');
            console.log(`File ${file.name} processed. Job ${job.id} completed.`);

            // Check the job's status for errors
            const errors = job.status.errors;
            if (errors && errors.length > 0) {
            throw errors;
            }
        })
        .catch(err => {
            console.error('ERROR:', err);
        });
    } else {
        return console.error(`File ${file.name} uploaded has ${ext} extension`);
    }

};

Скачать весь код функции

Теперь переходим во вкладку редактора pakage.json и там прописываем зависимости:

{
  "dependencies": {
    "@google-cloud/bigquery": "^2.0.3",
    "@google-cloud/storage": "^2.3.3"
  }
}

На этом функция готова, нажимаем кнопку Deploy для загрузки и активации функции.

Переходим в Cloud Storage и загружаем в наш bucket файл для примера. Если все ok, через минуту вы увидите новую таблицу в выбранном data set в BigQuery.

Пример запроса к сабсету таблиц:

SELECT * FROM `jsonstreaming.uploading.orders_2019*` WHERE _TABLE_SUFFIX BETWEEN '0701' AND '0703'

Наша функция логирует несколько сообщений о об этапах ее работы, которые можно посмотреть в разделе Stackdriver Logging → Logs Viewer по ссылке console.cloud.google.com/logs/viewer. Там же можно увидеть и ошибки, если таковые будут.

Здесь описан самый базовый алгоритм развертывания функции непосредственно в веб интерфейсе Cloud Fanctions. Это сделано для наглядности последовательности действий. С помощью же дополнительных инструментов вы можете писать код функции и даже в некоторой степени тестировать ее на своем компьютере в удобном для вас редакторе. Информация по данным инструментам доступна в соответствующих сообществах.