Bosch IoT Insights

Configuring a pipeline

After you have created a pipeline, you can start with the configuration of the pipeline.

A new pipeline version is created automatically for every saved change.
Refer to Managing pipeline versions.


This chapter details the following topics:

Prerequisites

  • To execute the following actions, you have to be assigned to the Admin role.

  • The Pipeline App has the status STOPPED unless otherwise stated.

Changing the basic information

You can change the Pipeline App Name and/or Pipeline Description in the Basic Information pane of App Settings.

Proceed as follows

  1. Enter a new name in the Pipeline App Name field.

  2. Enter a new description in the Pipeline Description field.

  3. Click Save.

→ The name and/or description of the Pipeline App has been changed.
→ A new pipeline version has been automatically created. Refer to Managing pipeline versions.


Configuring processes and instances

The table in the Processes and Instances pane lists the individual instances together with their CPU, Memory Usage, and Disk Usage. If an instance is currently not running, the corresponding columns display the images/confluence/download/thumbnails/1318021021/icon_not_running-version-1-modificationdate-1594905873000-api-v2.png icon.

The table displays also the Global Processing Timeout in minutes, as well as the Multithreaded Processing status, along with the number of threads.

You can scale your Pipeline App by creating instances if it requires more memory or computation power. You can either increase the amount of instances that should run or order more memory per instance.

Increasing the instances and memory usage affects your account. Check the service plans and our pricing model for detailed information. The pricing can be found in the marketplace where you booked your service plan.

You can also configure the global processing timeout value and choose between single threaded or multithreaded processing.


Prerequisites

For updating the number of instances, memory usage and disk limit (points 2, 3 and 4 below), the Pipeline App must have the status STARTED.

Proceed as follows

  1. Click the Settings icon images/confluence/download/thumbnails/1318021021/settings-version-1-modificationdate-1728395762000-api-v2.png in the Processes and Instances pane to scale the Pipeline App and to configure Advanced Settings.
    → The Scale Pipeline App dialog is displayed.

    images/confluence/download/attachments/1318021021/configure_pipeline_advanced_settings-version-1-modificationdate-1728395762000-api-v2.png
  2. In the Instances field, enter a number of instances you want to add.

  3. In the Memory Usage drop-down list, select a value to adjust the memory usage.

  4. In the Disk Limit drop-down list, select a value to adjust the disk limit.

  5. Click Advanced Settings to access the following additional configuration options:

    1. In the Default processing step timeout (in minutes) field, enter a value in minutes, which will be applied to the processing steps if no other timeout is defined.
      The default value is one minute. The minimum value is also one minute, whereas the maximum value is 24 hours, defined in minutes.

    2. In the Threading configuration area, choose between Single threaded or Multithreaded (default) processing.
      The default number of threads is five, which equals five input files.

      Although these advanced configuration settings are allowed, it is not recommended to change them, as this could lead to unforeseen behavior and issues.

  6. Click the Apply Changes button to save your changes.
    → The Pipeline App has been scaled.
    → The Advanced configuration settings for processing timeout and threading are applied.
    → A new pipeline version has been automatically created. Refer to Managing pipeline versions.

Adding environment variables

Environment variables behave like standard system variables. Using environment variables, you can set parameters in the User-Provided Environment Variables pane to have them available in your application code.

If you enter a valid environment variable, it is saved automatically to the backend.

Proceed as follows

  1. Click the Add Environment Variable button in the User-Provided Environment Variables pane.
    → A new line is added.

  2. Enter a Name for the environment variable.

  3. Enter a Value for the environment variable.
    → The environment variable has been set.

  4. Click the Save button.
    → A new pipeline version has been automatically created. Refer to Managing pipeline versions.

Example

In the following image, you see that the environment variable APP_ENVIRONMENT=DEV has been set. The SFDE_PIPELINE environment variable is set by default and cannot be changed.

images/confluence/download/attachments/1318021021/Processing_pipelines_environment_variables_20_06_17-version-3-modificationdate-1664530173000-api-v2.png

Depending on the programming language you use, this environment variable can be used inside the application.

In a Java application, you can print the variable as follows:

System.out.println(System.getenv("APP_ENVIRONMENT")); // -> DEV

In a python script, you can print the variable as follows:

print(os.environ['APP_ENVIRONMENT']) // -> DEV

Configuring the pipeline steps

Each pipeline consists of three steps:

  • Input trigger
    The input trigger step is the first step of every pipeline. It can only exist once per pipeline. The defined query filters the incoming data for processing.

  • Parser
    The parser is used to transform the input data. A default parser is available that automatically detects the file type of the input data and transforms the input data accordingly. Multiple parser steps can be added.

  • Output
    The output step is the last step of every pipeline. It can only exist once per pipeline.

Each pipeline can be expanded by the following steps:

  • Parser
    See above.

  • Custom
    In a custom step, you can upload your own code written in Python or Java to interact with and transform the input data.
    In the Examples chapter, you can find information on how to configure the custom step, refer to Pipelines: Configuring the custom step.

    The custom processing logic and its Intellectual Property (IP) belong to the project owner.
    The project owner is liable for damages which are caused by the custom processing logic.
    We advise you to perform security and validity scans (especially when using open source code).
    We also advise you to verify open source license compliance of your custom processing logic.

  • Device Position
    In a device position step, the GPS coordinates of a device are extracted and stored in its digital twin representation.

Proceed as follows

  1. Open the Configuration tab and configure or update the relevant step.

  2. When ready, click the Save button.
    → The configuration is saved.
    → A pipeline version is automatically created and can be tracked in the Change History tab. Refer to Managing pipeline versions.

    images/confluence/download/attachments/1318021021/input_trigger-version-1-modificationdate-1719312341000-api-v2.png


Configuring the input trigger

Proceed as follows

  1. In the Configuration tab, select the Input Trigger item in the sequence.

  2. In the Collection field, enter the collection of your input data if you have more than one.

  3. Set a trigger for the pipeline:

    1. Activate the Match all inserted files radio button to process all files uploaded to the database.

    2. Activate the Custom Filter radio button to trigger the pipeline only if the input data matches specific requirements.

    3. Insert a query into the code box.

      The syntax is based on the MongoDB Query Language.

  4. Click the Save button if you want to stop or go on with the next step.

Processing of duplicates

If you do not want to process duplicates, enter the following query:

{     "tags": { "$not": { "$elemMatch": { "$eq": "duplicate" } } } ,    "metaData.yourKey": "yourMatchText"}

If you want to process duplicates, enter the following query:

{     "metaData.yourKey": "yourMatchText"}

Configuring the parser

Proceed as follows

  1. Select the Parser item in the sequence.

  2. Optionally, click the Edit icon images/confluence/download/thumbnails/1318021021/icon_edit_history-version-1-modificationdate-1594905873000-api-v2.png to change the parser's name.

  3. In the Parser Type drop-down list, select a parser type, e.g. ZIP file, JSON, Vector DBC, etc.

    Some parser types are described in detail in the following.

    For optimal performance, we recommend a data size of less than 500 MB.

    → The output is a JSON file created from the content of the input data.


ASAM MDF parser step

This parser is used for the Measurement Data Format and is best suited for a timeseries output collection.

Proceed as follows

  1. In the Parser Type drop-down list, select the ASAM MDF parser type.

  2. Add the environment setting MDF_IGNORE_UNKOWN_BLOCKS=1 to ignore unknown blocks inside the MDF file while parsing. With this setting, the decoder only displays a warning instead of stopping the parsing process,
    in case of special mdf blocks.

  3. Select a timeseries collection which should be configured accordingly:

    1. Time field must be the same as the time attribute of the MDF

    2. Meta field should be defined and will be set to inputDataId

If a timeseries collection is selected the parser step output will be flattened and looks similar to the following:

[{
    <TimeField>: '2020-01-05T10:00:01.000Z',
'dwordCounter': 10,
    'PWMFiltered': 99,
    'Triangle': 37,
    'inputDataId': '5e1729cb84874f0015dd5014',
    'fileName': 'Vector_MinimumFile.MF4',
    'file': 'generic',
    'Timeclass': 'LocalPCreferencetime',
    'Startdistance': 0,
    'processedAt': '2020-01-09T13:25:31.554Z',
    'Starttime': '2011-08-24T13:53:19Z',
    'DataGroupBlock': 1,
    'receivedAt': '2020-01-09T13:25:31.483Z',
    'Startangle': 0,
    'Name': 'MinimumMDF4.1filederivedfromCANapefilebyremovingsomeblocks',
    <MetaField>: '5e1729cb84874f0015dd5014'
},
{
    <TimeField>: '2020-01-05T10:00:02.000Z',
'dwordCounter': 124637,
    'PWMFiltered': 110,
    'Triangle': 47,
    'inputDataId': '5e1729cb84874f0015dd5014',
    'fileName': 'Vector_MinimumFile.MF4',
    'file': 'generic',
    'Timeclass': 'LocalPCreferencetime',
    'Startdistance': 0,
    'processedAt': '2020-01-09T13:25:31.554Z',
    'Starttime': '2011-08-24T13:53:19Z',
    'DataGroupBlock': 1,
    'receivedAt': '2020-01-09T13:25:31.483Z',
    'Startangle': 0,
    'Name': 'MinimumMDF4.1filederivedfromCANapefilebyremovingsomeblocks',
    <MetaField>: '5e1729cb84874f0015dd5014'
}]

Vector DBC parser step

To parse input data of the type DBC, the CAN configuration must be customized with a regular expression. In addition, metadata can be matched with expressions and labeled accordingly. There is also a built-in text editor to test these expressions against the input data.

Proceed as follows

  1. In the Parser Type drop-down list, select the Vector DBC parser type.

  2. Optionally, click the Edit icon images/confluence/download/thumbnails/1318021021/icon_edit_history-version-1-modificationdate-1594905873000-api-v2.png to change the parser's name.

  3. In the attachment box, select or drop a ZIP file containing the input data of the type DBC.

  4. In the Vector DBC Parser Settings pane, enter a Regular Expression.

  5. Define the DeltaTime Group, CAN ID Group, and CAN Payload Group in the corresponding drop-down lists.

  6. Activate the CAN v2.0 checkbox to enable the Message ID to be 29 bits long.

    CAN v2.0 is activated by default. If you deactivate the checkbox, CAN v1.0 is used with a length of 11 bits.

  7. Activate the HEX Encoded checkbox to use HEX values.

    HEX Encoded is activated by default. If you deactivate the checkbox, integer values are used.

  8. Optionally, in the Metadata Fields pane, click the Add Metadata button to add a Metadata trace selection.
    → A new line is added.

    You can add a maximum of 10 metadata parameters.

  9. In the CAN Trace Evaluation pane, click the Test Regex button to test your regular expressions and the metadata rules with an example CAN trace.

FIBEX parser step

To parse input data of the type Fibex, the configuration must be customized with a regular expression. In addition, metadata can be matched with expressions and labeled accordingly. There is also a built-in text editor to test these expressions against the input data.

Proceed as follows

  1. In the Parser Type drop-down list, select the Fibex parser type.

  2. Optionally, click the Edit icon images/confluence/download/thumbnails/1318021021/icon_edit_history-version-1-modificationdate-1594905873000-api-v2.png to change the parser's name.

  3. In the attachment box, select or drop a ZIP file containing the input data of the type Fibex(XML).

  4. In the FIBEX Parser Settings pane, enter a Regular Expression.

  5. Define the DeltaTime Group, PDU ID, and PDU ID Group in the corresponding drop-down lists.

  6. Activate the HEX Encoded checkbox to use HEX values.

    HEX Encoded is activated by default. If you deactivate the checkbox, integer values are used.

  7. Optionally, in the Metadata Fields pane, click the Add Metadata button to add a Metadata trace selection.
    → A new line is added.

    You can add a maximum of 10 metadata parameters.

  8. In the Fibex Trace Evaluation pane, click the Test Regex button to test your regular expressions and the metadata rules with an example FIBEX trace.

iTraMS parser step

Proceed as follows

  1. In the Parser Type drop-down list, select the iTraMS parser type.

  2. Optionally, click the Edit icon images/confluence/download/thumbnails/1318021021/icon_edit_history-version-1-modificationdate-1594905873000-api-v2.png to change the parser's name.

  3. In the attachment box, select or drop a ZIP file containing the iTraMS decoder specification.

  4. Optionally, test the iTrams decoder. Therefore, select or drop a bin file in the attachment box of the Try Out section. If the file was the decoded, the processed data is shown.

    images/confluence/download/attachments/1318021021/parser_step_iTraMS_try_out_section-version-2-modificationdate-1611321485000-api-v2.png

Configuring the output

Proceed as follows

  1. Select the Output item in the sequence.

  2. In the Output Collection drop-down list, select a collection to which the results should be stored.

    To create a new collection, refer to Creating a new collection.

  3. In case of an existing iTraMS Parser, there is an iTraMS Time Bucketing section. Optionally, select a time bucket size or turn it off completely.

    images/confluence/download/attachments/1318021021/output_step_iTraMS_time_bucketing_section-version-2-modificationdate-1611321485000-api-v2.png

The output data may not exceed 16 MB per document. Otherwise the document cannot be saved to the database.

Creating a new collection

There are two subtypes of collections:

  • Standard
    For regular use cases

  • Time Series
    To store sequences of measurements

    We recommend to use documents with a flat data structure. If a parser step is used in the pipeline, the flat data structure will be applied. The flat data structure is used to prevent the generation of the sub entries "metadata" and "payload", as these are currently generated by default. That means, that these two items are missing in the Output JSON.

Proceed as follows

  1. Click the New Collection button.
    → The Create New Collection dialog is displayed.

  2. In the Technical Name field, enter a technical name for the collection.

  3. In the Label field, enter a label that will be displayed.

  4. In the SubType drop-down list, select the subtype of the new collection.

  5. If you selected Time Series, proceed as follows:

    1. In the Time Field field, enter the field that contains the date in the time series document.

      The time field must be on root level.

    2. In the Meta Field field, enter a field name that contains metadata, e.g. deviceId.

      Only top-level fields are allowed as metadata.

    3. In the Expire after days field, define the number of days after which the documents will be deleted.

    4. In the Granularity drop-down list, define the unit in which the time between individual sensor measurements passes.

  6. Click the Create button.
    → The new collection is created.
    → A new query template for this collection is created, refer to Default query template for time series collections.

Configuring additional custom, parser, and device position steps

It is possible to add any number of parser and custom steps to a pipeline.

Proceed as follows

  1. Hover over the Plus icon images/confluence/download/thumbnails/1318021021/icon_plus-version-1-modificationdate-1594905873000-api-v2.png and click the Add Step button.

  2. Select either Custom, Parser, or Device Position.

  3. If you selected Parser, proceed as described in the Configuring the parser section.

  4. If you selected Device Position, proceed as described in the Configuring the device position step section.

  5. If you selected Custom, proceed with the following steps.

  6. Optionally, click the Edit icon images/confluence/download/thumbnails/1318021021/icon_edit_history-version-1-modificationdate-1594905873000-api-v2.png to change the step's name.

  7. In the attachment box, select or drop a ZIP file containing your own code written in Python or Java.

  8. Optionally, in the Step timeout (in minutes) field, add a value in minutes after which the processing will be aborted. This value should only need to be changed if timeouts occur caused by long processing, otherwise the default value should be kept.

Custom step examples

The code examples in the Pipelines: Configuring the custom step chapter give you a first impression on how to configure the custom step.

Configuring the device position step

The supported format of the GPS coordinates is WGS-84. Incorrect GPS coordinates are filtered out.

Proceed as follows

  1. In the Meta Information pane, select the Device ID path from the subscription namespace of Bosch IoT Things in the Device ID field, e.g. payload.id.

  2. In the Timestamp field, select the timestamp path from the subscription namespace of Bosch IoT Things, e.g. payload.timestamp.

  3. In the Device type field, select the device type the new devices will be assigned to.

  4. In the GPS Coordinates pane, select the latitude path from the subscription namespace of Bosch IoT Things in the Latitude field.

  5. In the Longitude field, select the longitude path from the subscription namespace of Bosch IoT Things.

  6. In the Altitude field, select the altitude path.

  7. Open the Advanced Settings drop-down menu to configure options that affect the general processing behavior.

    1. Activate the Auto-create missing devices toggle to automatically create a device of which the identifier is not yet in Bosch IoT Insights.
      → The device is created based on the selected device type.

    2. Activate the Fail on invalid data toggle to fail the processing of the data upload if mandatory fields are empty or invalid data is encountered. If the toggle is deactivated, the document with invalid data is skipped and the processing of subsequent documents continues.

    3. In the GPS jitter validation (ms) field, enter a value in milliseconds during which all input data (GPS values, timestamps, altitude) is averaged to prevent excessive device updates and entries in the database.

    4. In the Time range validation field, select a start date and an end date for which the timestamp values in the input documents are valid.

  8. Open the Tour Generation Settings drop-down menu to decide whether a tour should be generated with the incoming data.

    1. Activate the Tour generation toggle to generate a tour of the given location information.

    2. In the Tour generation collection drop-down list, select a collection to store the tour.

    3. In the Speed validation (km/h) field, enter a value in km/h. Documents with a higher speed than configured will be skipped and not processed. These unrealistic data jumps may derive from the GPS signal.

    4. In the Tour time separator (min) field, enter a value in minutes after which a new tour document will be started.

Processing large files from raw data storage

We recommend to process large files (100 MB and above) by using streaming via a custom step. Depending on the program logic, this may be required to prevent memory congestion.

Therefore you need to create a custom step that can handle two types of input files. First you have to change the configuration in the manifest to get payload references for large files (>10 KB), small files are directly embedded in the injected data. Then configure the corresponding custom step, for more information on how to configure such a step refer to the code examples and the corresponding README files in the Pipelines: Configuring the custom step.