Beam BigQuery to XML

Beam BigQuery to XML

Apache Beam is a sdk abstraction for transformation of BigData from source to sink (see beam documentation).

So your standard transformation would be:

Standard transformations read from a source (sometimes with a join to another source) and after a filter and transformation write the result to a sink. Since we are dealing with bigdata, both the source and sink need to support splitting, so that the workload can be spread over multiple machines.

So a standard scenario would be to read from some storage or db, transform and then write to a csv/avro file. Since each computer writes the result directly to files, usually have a naming pattern for multiple files which are split according to servers.

My scenario was to read records from BigQuery, transform them to XML and then write each record to a separate file, where the name of the file is from an attribute from the XML.

The standard sink is TextIO that can export all data to a single sink file. Inorder to get more control over the name of the file and the data in the file, we need to use the FileIO sink. This sink allows us to control the name of the file, and group the data accordingly.

So before writing to the file we want to have a data structure of KV (key, value) were the key is the information and grouping of the file name, and the value is the actual data to write.

The transformation of the data would be:

@ProcessElement
public void processElement(ProcessContext c) {
    try {
        val docFactory = DocumentBuilderFactory.newInstance();
        docBuilder = docFactory.newDocumentBuilder();
        val tf = TransformerFactory.newInstance();
        transformer = tf.newTransformer();
        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
        val doc = docBuilder.newDocument();
        val options = c.getPipelineOptions().as(ExportBQToXmlOptions.class);
        val rootElement = doc.createElement(options.getRootElement());
        doc.appendChild(rootElement);
        final Map<String, Object> element = c.element();
        final String businessId = (String) getBusinessId(element);
        rowToElement(element, doc, rootElement);
        val writer = new StringWriter();
        transformer.transform(new DOMSource(doc), new StreamResult(writer));
        val output = writer.getBuffer().toString().replaceAll("\n|\r", "");
        LOG.info(LogUtils.prefixLog("action=xml, businessId={}"), businessId);
        c.output(KV.of(businessId, output));
    } catch (Exception e) {
        LOG.error(LogUtils.prefixLog(e.getMessage()));
    }
}

Since the factory methods for creating the XML are expensive and we don’t want to recreate them for each transformation, so we will move the code to the bundling stage:

@StartBundle
@SneakyThrows
public void init() {
    val docFactory = DocumentBuilderFactory.newInstance();
    docBuilder = docFactory.newDocumentBuilder();
    val tf = TransformerFactory.newInstance();
    transformer = tf.newTransformer();
    transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
}

Our pipeline would then be:

DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH:mm:ss");
LocalDateTime now = LocalDateTime.now();
final Pipeline pipeline = Pipeline.create(options);
pipeline.apply("read data",BigQueryIO.readTableRows()
                .fromQuery(options.getExtractSql())
                .usingStandardSql())
        .apply("table data map", ParDo.of(mapRowToXml()))
        .apply("to xml", FileIO.<String, KV<String, String>>writeDynamic()
                .to(options.getExportLocation() + "/" + now.format(formatter))
                .by(KV::getKey)
                .via(Contextful.fn(KV::getValue), TextIO.sink())
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(k -> (window, pane, numShards, shardIndex, compression) -> k + ".xml"));
pipeline.run();

For the full code please see ExportBQToXmlPipeline.java.

Backend/Data Architect

Backend Group
Thank you for your interest!

We will contact you as soon as possible.

Send us a message

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com