How I decreased my 8-hours python image dataset generation process to a mere 30-mins
As a data scientist, preparing data is very time consuming. I regularly have to preprocess big chunks of data, and I can’t always predict what error I will encounter through the whole dataset.
Once your process is fool proof, you have to make a final run with all the data and, depending on your machine, it can take up several hours or even days.
Usually, when you have lots of data to process with python, you quickly have to resort to multiprocessing to parallelize the jobs like in the next graph.
This is usually the most CPU intensive you can get with python, however it has some issues:
- It isn’t suitable when order matters
- Each multiprocessing thread will perform all operations and will thus consume the same amount of RAM which is about the same as for one single programe execution
- The syntax per se is kinda ugly (yup, i’m picky)
So I rolled up my sleeves and wrote a package to address those issues and still be as efficient as multiprocessing: Olympipe
The main goal: Use one thread for each costly operation and chain those threads as a pipeline so that each file will go through them in the right order.
As you can see, the resulting computing time now only depends on the longest operation, the order is kept and the memory is efficiently managed as we only need the current operation context for each operation thread.
Here is what a simple task looks like:
def times_2(x: int) -> int:
return x * 2
datas = list(range(10))
# classic process
res = []
for data in datas:
res.append(times_2(data))
print(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Here is what it looks like with olympipe:
from olympipe import Pipeline
def times_2(x: int) -> int:
return x * 2
datas = list(range(10))
# olympipe process
p = Pipeline(datas)
p1 = p.task(times_2)
res = p1.wait_for_result()
print(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
It can also be chained if needed:
from olympipe import Pipeline
def times_2(x: int) -> int:
return x * 2
datas = list(range(10))
# olympipe process
res = Pipeline(datas).task(times_2).wait_for_result()
print(res) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Please note that the datas used can be any of iterable. A generator sending camera images or sound or even http payloads is also viable.
Another interesting feature is that it supports non linear workflows
Here is a short example:
import json
import re
from typing import List
from olympipe import Pipeline
data: List[str] = json.loads(open("client_data.json", "r").read())
phone_regex = re.compile(r"^[\+]?[(]?[0-9]{3}[)]?[-\s\.]?[0-9]{3}[-\s\.]?[0-9]{4,6}$")
email_regex = re.compile(
r"/^[a-zA-Z0-9.!#$%&'*+/=?^_`{|}~-]+@[a-zA-Z0-9-]+(?:\.[a-zA-Z0-9-]+)*$/"
)
pipe = Pipeline(data)
email_missing = pipe.filter(lambda x: not bool(email_regex.match(x)))
email_valid = pipe.filter(lambda x: bool(email_regex.match(x)))
phone_missing = email_valid.filter(lambda x: not bool(phone_regex.match(x)))
all_info_ok = email_valid.filter(lambda x: bool(phone_regex.match(x)))
res_invalid_email, res_invalid_phone, res_ok = Pipeline.wait_for_all_results(
[
email_missing,
phone_missing,
all_info_ok,
]
)
print(len(res_ok), "Valid client information")
So that’s it, you now have the keys to build yout own local data pipelines. More examples are available in the documentation https://pypi.org/project/olympipe/
Feel free to post some feedback on the project or ask me if you have needs not adressed by the current version.