Python
You can download the following code examples cloud-examples-python.zip and import them to your development IDE.
Make sure to set your username and password as well as the appropriate proxy and the desired Bosch IoT Insights project before running the applications.
For the basic authentication, use the credentials of the API user. To create an API user, refer to Creating an API user or Creating an API user via API.
We recommend you to use preemptive authentication. That way, the basic authentication request is sent before the server returns an unauthorized response. Also refer to the Apache documentation.
Sending simple data
The following Python code example shows you how to send simple data like a string to the HTTP Data Recorder Service of the Bosch IoT Insights backend using Python code.
import
base64
from
urllib.request
import
Request, ProxyHandler, build_opener
INSIGHTS_USERNAME
=
'<username>'
INSIGHTS_PASSWORD
=
'<password>'
PROXIES
=
{
'http'
:
'http://<username>:<password>@rb-proxy-de.bosch.com:8080'
,
'https'
:
'https://<username>:<password>@rb-proxy-de.bosch.com:8080'
}
# If you are inside your company network, a proxy authentication may be required. Otherwise, you can remove this from the example. This is an example for a Bosch internal proxy.
insightsProject
=
'demo'
def
basics_auth(username, password):
""" Method which return a base 64 basic authentication
:param username: your INSIGHTS username
:param password: your password username
:return: returns string
"""
credential
=
username
+
':'
+
password
encoded_credential
=
credential.encode(
'ascii'
)
b_encoded_credential
=
base64.b64encode(encoded_credential)
b_encoded_credential
=
b_encoded_credential.decode(
'ascii'
)
b_auth
=
b_encoded_credential
return
'Basic %s'
%
b_auth
def
make_request(server_url, post_data
=
None
):
""" Method which handel a request and return response as json
:param server_url: the url for the request
:param post_data: post data for Post method
:return: content
"""
proxy
=
ProxyHandler(PROXIES)
opener
=
build_opener(proxy)
r
=
Request(server_url
+
insightsProject)
insights_b_auth
=
basics_auth(INSIGHTS_USERNAME, INSIGHTS_PASSWORD)
r.add_header(
'Authorization'
, insights_b_auth)
r.add_header(
'Content-Type'
,
'application/json'
)
r.add_header(
'Accept'
,
'application/json'
)
r_data
=
post_data
r.data
=
r_data
handle
=
opener.
open
(r)
content
=
handle.read().decode(
'utf8'
)
return
content
def
main():
try
:
# Server and the api service details
server
=
'https://bosch-iot-insights.com'
serviceBaseUrl
=
'/data-recorder-service/'
service
=
'v2/'
payload
=
"{Python:'Hello World'}"
url
=
server
+
serviceBaseUrl
+
service
# construction of the final url for the first POST request
data
=
payload.encode(
"ascii"
)
# encoding format is ascii for the initial POST request
res
=
make_request(url, data)
print
(res)
print
(
'Data was send to Project:'
+
insightsProject)
except
IOError as e:
print
(
"Problem with the request.."
)
print
(e)
print
(e.read())
if
__name__
=
=
'__main__'
: main()
Sending a file
The following code example shows you how to send the content of a file to the HTTP Data Recorder Service of the Bosch IoT Insights backend using Python code.
import
base64
import
requests
# TODO: Please specify your technical user account (not SingleKey ID / CIAM)
INSIGHTS_USERNAME
=
'technicalUserName'
INSIGHTS_PASSWORD
=
'technicalUserPW'
# TODO: Please specific your project id (Hint: can be taken from the url)
insightsProject
=
'youProjectName'
# Server and the api service details
server
=
'https://bosch-iot-insights.com'
serviceBaseUrl
=
'/data-recorder-service/'
service
=
'v2/'
def
basics_auth(username, password):
""" Method which return a base 64 basic authentication
:param username: your IoT Insights username
:param password: your IoT Insights password username
:return: returns string
"""
credential
=
username
+
':'
+
password
encoded_credential
=
credential.encode(
'ascii'
)
b_encoded_credential
=
base64.b64encode(encoded_credential)
b_encoded_credential
=
b_encoded_credential.decode(
'ascii'
)
b_auth
=
b_encoded_credential
return
'Basic %s'
%
b_auth
def
make_request(
file
, metaData):
b_cred
=
basics_auth(INSIGHTS_USERNAME, INSIGHTS_PASSWORD)
url
=
server
+
serviceBaseUrl
+
service
+
insightsProject
headers
=
{
'Authorization'
: b_cred,
'X-Metadata'
: metaData}
r
=
requests.post(url, headers
=
headers, files
=
file
)
return
r.text
def
main():
try
:
#Specify the path to the file that should be uploaded
file
=
{
'file'
:
open
(
'C:\\Path\\To\\Your\\File.txt'
,
'rb'
)}
#Add your metaData parameters here as comma separated list of key-value pairs
#This will be seen and useable in the input history of IoT Insights
metaData
=
'key=value'
res
=
make_request(
file
, metaData)
print
(res)
print
(
'Data was send to Project:'
+
insightsProject)
except
IOError as e:
print
(
"Problem with the request.."
)
print
(e)
print
(e.read())
if
__name__
=
=
'__main__'
: main()
Synchronous query execution
This code example shows you how to execute a MongoDB aggregation query against a demo collection.
import
base64
import
json
from
urllib.request
import
Request, ProxyHandler, build_opener
INSIGHTS_USERNAME
=
'<username>'
INSIGHTS_PASSWORD
=
'<password>'
PROXIES
=
{
'http'
:
'http://<username>:<password>@rb-proxy-de.bosch.com:8080'
,
'https'
:
'https://<username>:<password>@rb-proxy-de.bosch.com:8080'
}
# If you are inside your company network, a proxy authentication may be required. Otherwise, you can remove this from the example. This is an example for a Bosch internal proxy.
insightsProject
=
'demo'
def
basics_auth(username, password):
""" Method which return a base 64 basic authentication
:param username: your INSIGHTS username
:param password: your password username
:return: returns string
"""
credential
=
username
+
':'
+
password
encoded_credential
=
credential.encode(
'ascii'
)
b_encoded_credential
=
base64.b64encode(encoded_credential)
b_encoded_credential
=
b_encoded_credential.decode(
'ascii'
)
b_auth
=
b_encoded_credential
return
'Basic %s'
%
b_auth
def
make_request(server_url, method, post_data
=
None
):
""" Method which handel a request and return response as json
:param server_url: the url for the request
:param method: GET or POST method
:param post_data: post data for Post method
:return: returns json
"""
proxy
=
ProxyHandler(PROXIES)
opener
=
build_opener(proxy)
r
=
Request(server_url)
insights_b_auth
=
basics_auth(INSIGHTS_USERNAME, INSIGHTS_PASSWORD)
r.add_header(
'Authorization'
, insights_b_auth)
r.add_header(
'Content-Type'
,
'application/json'
)
r.add_header(
'Accept'
,
'application/json'
)
r_data
=
post_data
r.data
=
r_data
handle
=
opener.
open
(r)
content
=
handle.read().decode(
'utf8'
)
response
=
json.loads(content)
return
response
def
main():
try
:
# Server and the api service details
server
=
'https://bosch-iot-insights.com'
serviceBaseUrl
=
'/mongodb-query-service/v2/'
+
insightsProject
service
=
'/execute-aggregation-query'
query
=
'''
{
"collection"
:
"demo_processed_data"
,
"query"
: [
{
"$limit"
:
1
}
]
}
'''
url
=
server
+
serviceBaseUrl
+
service
# construction of the final url for the first POST request
data
=
query.encode(
"ascii"
)
# encoding format is ascii for the initial POST request
res
=
make_request(url,
'POST'
, data)
print
(res)
except
IOError as e:
print
(
"Problem with the request.."
)
print
(e)
if
__name__
=
=
'__main__'
: main()
Asynchronous query execution
The following code example shows you how to send an asynchronous MongoDB aggregation query and receive the result in Excel-sheet with Visual Basic.
Make sure to set your authorization credentials and if necessary the appropriate proxy settings.
import
base64
import
csv
import
json
import
time
from
urllib.request
import
Request, ProxyHandler, build_opener
INSIGHTS_USERNAME
=
'<username>'
INSIGHTS_PASSWORD
=
'<password>'
PROXIES
=
{
'http'
:
'http://<username>:<password>@rb-proxy-de.bosch.com:8080'
,
'https'
:
'https://<username>:<password>@rb-proxy-de.bosch.com:8080'
}
# If you are inside your company network, a proxy authentication may be required. Otherwise, you can remove this from the example. This is an example for a Bosch internal proxy.
insightsProject
=
'demo'
def
basics_auth(username, password):
""" Method which return a base 64 basic authentication
:param username: your INSIGHTS username
:param password: your password username
:return: returns string
"""
credential
=
username
+
':'
+
password
encoded_credential
=
credential.encode(
'ascii'
)
b_encoded_credential
=
base64.b64encode(encoded_credential)
b_encoded_credential
=
b_encoded_credential.decode(
'ascii'
)
b_auth
=
b_encoded_credential
return
'Basic %s'
%
b_auth
def
make_request(server_url, method, post_data
=
None
):
""" Method which handel a request and return response as json
:param server_url: the url for the request
:param method: GET or POST method
:param post_data: post data for Post method
:return: returns json
"""
proxy
=
ProxyHandler(PROXIES)
opener
=
build_opener(proxy)
r
=
Request(server_url)
insights_b_auth
=
basics_auth(INSIGHTS_USERNAME, INSIGHTS_PASSWORD)
r.add_header(
'Authorization'
, insights_b_auth)
r.add_header(
'Content-Type'
,
'application/json'
)
r.add_header(
'Accept'
,
'application/json'
)
if
method
=
=
'POST'
:
r_data
=
post_data
r.data
=
r_data
r.get_method
=
lambda
:
'POST'
handle
=
opener.
open
(r)
content
=
handle.read().decode(
'utf8'
)
response
=
json.loads(content)
return
response
elif
method
=
=
'GET'
:
r.get_method
=
lambda
:
'GET'
handle
=
opener.
open
(r)
content
=
handle.read().decode(
'utf8'
)
response
=
json.loads(content)
return
response
else
:
print
(
'Method %s is available'
%
method)
def
main():
try
:
# Server and the api service details
server
=
'https://bosch-iot-insights.com'
serviceBaseUrl
=
'/mongodb-query-service/v2/'
+
insightsProject
service
=
'/submit-aggregation-query'
query
=
'''
{
"collection"
:
"demo_processed_data"
,
"query"
: [
{
"$limit"
:
10
}
]
}
'''
url
=
server
+
serviceBaseUrl
+
service
# construction of the final url for the first POST request
data
=
query.encode(
"ascii"
)
# encoding format is ascii for the initial POST request
res
=
make_request(url,
'POST'
, data)
post_id
=
res[
'queryId'
]
# retreive the ID of the POST request
print
(
'post id'
, post_id)
# Construct the GET request for POLLING the server using the POST ID
get_url
=
server
+
serviceBaseUrl
+
"/queries"
+
"/"
+
post_id
print
(
"get_url: "
, get_url)
res_temp
=
make_request(get_url,
'GET'
)
status_temp
=
res_temp[
'status'
]
print
(
'current status'
, status_temp)
# POLLING the server in this loop till the status of the GET request is SUCCESSFUL
while
True
:
print
(
'Entered inside loop'
)
res_temp
=
make_request(get_url,
'GET'
)
status_temp
=
res_temp[
'status'
]
if
status_temp
=
=
'SUCCESSFUL'
:
break
print
(
'current status - INSIDE LOOP'
, status_temp)
time.sleep(
2
)
# If the status is SUCCESSFUL, we retrieve the results using the POST ID in the json format
if
status_temp
=
=
'SUCCESSFUL'
:
get_url_result
=
server
+
serviceBaseUrl
+
"/queries"
+
"/"
+
post_id
+
\
"/result?format=application%2Fjson"
print
(
"get_url_result: "
, get_url_result)
res_final
=
make_request(get_url_result,
'GET'
)
# writing the JSON response to CSV file
with
open
(
"vin_ccu_brake_events.csv"
,
"w"
, newline
=
'') as
file
:
output
=
csv.writer(
file
)
output.writerow(res_final[
0
].keys())
# header row
for
row
in
res_final:
output.writerow(row.values())
print
(
"-----------------------------final response--------------------------------"
)
print
(res_final)
else
:
print
(
"Error in data processing on the server: "
, status_temp)
except
IOError as e:
print
(
"Problem with the request.."
)
print
(e)
print
(e.read())
if
__name__
=
=
'__main__'
: main()