from __future__ import absolute_import
import os
import logging
from urllib.request import urlopen
from traitlets import (
observe,
Bool, Dict, Instance, Int, List, TraitError, Unicode, validate
)
from ipywidgets import DOMWidget, Image, Video, Audio, register, widget_serialization
import ipywebrtc._version
import traitlets
logger = logging.getLogger("jupyter-webrtc")
semver_range_frontend = "~" + ipywebrtc._version.__version_js__
# for backwards compatibility with ipyvolume
HasStream = MediaStream
[docs]@register
class ImageStream(MediaStream):
"""Represent a media stream by a static image"""
_model_name = Unicode('ImageStreamModel').tag(sync=True)
image = Instance(
Image,
help="An ipywidgets.Image instance that will be the source of the media stream."
).tag(sync=True, **widget_serialization)
[docs] @classmethod
def from_file(cls, filename, **kwargs):
"""Create a `ImageStream` from a local file.
Parameters
----------
filename: str
The location of a file to read into the value from disk.
**kwargs
Extra keyword arguments for `ImageStream`
"""
return cls(image=Image.from_file(filename), **kwargs)
[docs] @classmethod
def from_url(cls, url, **kwargs):
"""Create a `ImageStream` from a url.
This will create a `ImageStream` from an Image using its url
Parameters
----------
url: str
The url of the file that will be used for the .image trait.
**kwargs
Extra keyword arguments for `ImageStream`
"""
return cls(image=Image.from_url(url), **kwargs)
[docs] @classmethod
def from_download(cls, url, **kwargs):
"""Create a `ImageStream` from a url by downloading
Parameters
----------
url: str
The url of the file that will be downloadeded and its bytes
assigned to the value trait of the video trait.
**kwargs
Extra keyword arguments for `ImageStream`
"""
ext = os.path.splitext(url)[1]
if ext:
format = ext[1:]
image = Image(value=urlopen(url).read(), format=format)
return cls(image=image, **kwargs)
[docs]@register
class VideoStream(MediaStream):
"""Represent a stream of a video element"""
_model_name = Unicode('VideoStreamModel').tag(sync=True)
video = Instance(
Video,
allow_none=False,
help="An ipywidgets.Video instance that will be the source of the media stream."
).tag(sync=True, **widget_serialization)
playing = Bool(True, help='Plays the videostream or pauses it.').tag(sync=True)
[docs] @classmethod
def from_file(cls, filename, **kwargs):
"""Create a `VideoStream` from a local file.
Parameters
----------
filename: str
The location of a file to read into the value from disk.
**kwargs
Extra keyword arguments for `VideoStream`
"""
video = Video.from_file(filename, autoplay=False, controls=False)
return cls(video=video, **kwargs)
[docs] @classmethod
def from_url(cls, url, **kwargs):
"""Create a `VideoStream` from a url.
This will create a `VideoStream` from a Video using its url
Parameters
----------
url: str
The url of the file that will be used for the .video trait.
**kwargs
Extra keyword arguments for `VideoStream`
"""
video = Video.from_url(url, autoplay=False, controls=False)
return cls(video=video, **kwargs)
[docs] @classmethod
def from_download(cls, url, **kwargs):
"""Create a `VideoStream` from a url by downloading
Parameters
----------
url: str
The url of the file that will be downloadeded and its bytes
assigned to the value trait of the video trait.
**kwargs
Extra keyword arguments for `VideoStream`
"""
ext = os.path.splitext(url)[1]
if ext:
format = ext[1:]
video = Video(value=urlopen(url).read(), format=format, autoplay=False, controls=False)
return cls(video=video, **kwargs)
[docs]@register
class AudioStream(MediaStream):
"""Represent a stream of an audio element"""
_model_name = Unicode('AudioStreamModel').tag(sync=True)
_view_name = Unicode('AudioStreamView').tag(sync=True)
audio = Instance(
Audio,
help="An ipywidgets.Audio instance that will be the source of the media stream."
).tag(sync=True, **widget_serialization)
playing = Bool(True, help='Plays the audiostream or pauses it.').tag(sync=True)
[docs] @classmethod
def from_file(cls, filename, **kwargs):
"""Create a `AudioStream` from a local file.
Parameters
----------
filename: str
The location of a file to read into the audio value from disk.
**kwargs
Extra keyword arguments for `AudioStream`
"""
audio = Audio.from_file(filename, autoplay=False, controls=False)
return cls(audio=audio, **kwargs)
[docs] @classmethod
def from_url(cls, url, **kwargs):
"""Create a `AudioStream` from a url.
This will create a `AudioStream` from an Audio using its url
Parameters
----------
url: str
The url of the file that will be used for the .audio trait.
**kwargs
Extra keyword arguments for `AudioStream`
"""
audio = Audio.from_url(url, autoplay=False, controls=False)
return cls(audio=audio, **kwargs)
[docs] @classmethod
def from_download(cls, url, **kwargs):
"""Create a `AudioStream` from a url by downloading
Parameters
----------
url: str
The url of the file that will be downloadeded and its bytes
assigned to the value trait of the video trait.
**kwargs
Extra keyword arguments for `AudioStream`
"""
ext = os.path.splitext(url)[1]
if ext:
format = ext[1:]
audio = Audio(value=urlopen(url).read(), format=format, autoplay=False, controls=False)
return cls(audio=audio, **kwargs)
[docs]@register
class CameraStream(MediaStream):
"""Represents a media source by a camera/webcam/microphone using
getUserMedia. See
https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
for more detail.
The constraints trait can be set to specify constraints for the camera or
microphone, which is described in the documentation of getUserMedia, such
as in the link above,
Two convenience methods are avaiable to easily get access to the 'front'
and 'back' camera, when present
>>> CameraStream.facing_user(audio=False)
>>> CameraStream.facing_environment(audio=False)
"""
_model_name = Unicode('CameraStreamModel').tag(sync=True)
# Specify audio constraint and video constraint
# see https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
constraints = Dict(
{'audio': True, 'video': True},
help='Constraints for the camera, see https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia for details.'
).tag(sync=True)
[docs] @classmethod
def facing_user(cls, audio=True, **kwargs):
"""Convenience method to get the camera facing the user (often front)
Parameters
----------
audio: bool
Capture audio or not
**kwargs
Extra keyword arguments passed to the `CameraStream`
"""
return cls._facing(facing_mode='user', audio=audio, **kwargs)
[docs] @classmethod
def facing_environment(cls, audio=True, **kwargs):
"""Convenience method to get the camera facing the environment (often the back)
Parameters
----------
audio: bool
Capture audio or not
**kwargs
Extra keyword arguments passed to the `CameraStream`
"""
return cls._facing(facing_mode='environment', audio=audio, **kwargs)
@staticmethod
def _facing(facing_mode, audio=True, **kwargs):
kwargs = dict(kwargs)
constraints = kwargs.pop('constraints', {})
if 'audio' not in constraints:
constraints['audio'] = audio
if 'video' not in constraints:
constraints['video'] = {}
constraints['video']['facingMode'] = facing_mode
return CameraStream(constraints=constraints, **kwargs)
[docs]class Recorder(DOMWidget):
_model_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module_version = Unicode(semver_range_frontend).tag(sync=True)
_model_module_version = Unicode(semver_range_frontend).tag(sync=True)
stream = Instance(
MediaStream, allow_none=True, help="An instance of :class:`MediaStream` that is the source for recording."
).tag(sync=True, **widget_serialization)
filename = Unicode('record', help='The filename used for downloading or auto saving.').tag(sync=True)
format = Unicode('webm', help='The format of the recording.').tag(sync=True)
recording = Bool(False, help='(boolean) Indicator and controller of the recorder state, i.e. putting the value to True will start recording.').tag(sync=True)
autosave = Bool(False, help='If true, will save the data to a file once the recording is finished (based on filename and format)').tag(sync=True)
_data_src = Unicode('').tag(sync=True)
[docs] def download(self):
"""Download the recording (usually a popup appears in the browser)"""
self.send({'msg': 'download'})
[docs]@register
class ImageRecorder(Recorder):
"""Creates a recorder which allows to grab an Image from a MediaStream widget.
"""
_model_name = Unicode('ImageRecorderModel').tag(sync=True)
_view_name = Unicode('ImageRecorderView').tag(sync=True)
image = Instance(Image).tag(sync=True, **widget_serialization)
format = Unicode('png', help='The format of the image.').tag(sync=True)
_width = Unicode().tag(sync=True)
_height = Unicode().tag(sync=True)
def __init__(self, format='png', filename=Recorder.filename.default_value, recording=False, autosave=False, **kwargs):
super(ImageRecorder, self).__init__(
format=format, filename=filename, recording=recording, autosave=autosave, **kwargs)
if 'image' not in kwargs:
# Set up initial observer on child:
self.image.observe(self._check_autosave, 'value')
@traitlets.default('image')
def _default_image(self):
return Image(width=self._width, height=self._height, format=self.format)
@observe('_width')
def _update_image_width(self, change):
self.image.width = self._width
@observe('_height')
def _update_image_height(self, change):
self.image.height = self._height
@observe('format')
def _update_image_format(self, change):
self.image.format = self.format
@observe('image')
def _bind_image(self, change):
if change.old:
change.old.unobserve(self._check_autosave, 'value')
change.new.observe(self._check_autosave, 'value')
def _check_autosave(self, change):
if len(self.image.value) and self.autosave:
self.save()
[docs] def save(self, filename=None):
"""Save the image to a file, if no filename is given it is based on the filename trait and the format.
>>> recorder = ImageRecorder(filename='test', format='png')
>>> ...
>>> recorder.save() # will save to test.png
>>> recorder.save('foo') # will save to foo.png
>>> recorder.save('foo.dat') # will save to foo.dat
"""
filename = filename or self.filename
if '.' not in filename:
filename += '.' + self.format
if len(self.image.value) == 0:
raise ValueError('No data, did you record anything?')
with open(filename, 'wb') as f:
f.write(self.image.value)
[docs]@register
class VideoRecorder(Recorder):
"""Creates a recorder which allows to record a MediaStream widget, play the
record in the Notebook, and download it or turn it into a Video widget.
For help on supported values for the "codecs" attribute, see
https://stackoverflow.com/questions/41739837/all-mime-types-supported-by-mediarecorder-in-firefox-and-chrome
"""
_model_name = Unicode('VideoRecorderModel').tag(sync=True)
_view_name = Unicode('VideoRecorderView').tag(sync=True)
video = Instance(Video).tag(sync=True, **widget_serialization)
codecs = Unicode('', help='Optional codecs for the recording, e.g. "vp8" or "vp9, opus".').tag(sync=True)
def __init__(self, format='webm', filename=Recorder.filename.default_value, recording=False, autosave=False, **kwargs):
super(VideoRecorder, self).__init__(
format=format, filename=filename, recording=recording, autosave=autosave, **kwargs)
if 'video' not in kwargs:
# Set up initial observer on child:
self.video.observe(self._check_autosave, 'value')
@traitlets.default('video')
def _default_video(self):
return Video(format=self.format, controls=True)
@observe('format')
def _update_video_format(self, change):
self.video.format = self.format
@observe('video')
def _bind_video(self, change):
if change.old:
change.old.unobserve(self._check_autosave, 'value')
change.new.observe(self._check_autosave, 'value')
def _check_autosave(self, change):
if len(self.video.value) and self.autosave:
self.save()
[docs] def save(self, filename=None):
"""Save the video to a file, if no filename is given it is based on the filename trait and the format.
>>> recorder = VideoRecorder(filename='test', format='mp4')
>>> ...
>>> recorder.save() # will save to test.mp4
>>> recorder.save('foo') # will save to foo.mp4
>>> recorder.save('foo.dat') # will save to foo.dat
"""
filename = filename or self.filename
if '.' not in filename:
filename += '.' + self.format
if len(self.video.value) == 0:
raise ValueError('No data, did you record anything?')
with open(filename, 'wb') as f:
f.write(self.video.value)
[docs]@register
class AudioRecorder(Recorder):
"""Creates a recorder which allows to record the Audio of a MediaStream widget, play the
record in the Notebook, and download it or turn it into an Audio widget.
For help on supported values for the "codecs" attribute, see
https://stackoverflow.com/questions/41739837/all-mime-types-supported-by-mediarecorder-in-firefox-and-chrome
"""
_model_name = Unicode('AudioRecorderModel').tag(sync=True)
_view_name = Unicode('AudioRecorderView').tag(sync=True)
audio = Instance(Audio).tag(sync=True, **widget_serialization)
codecs = Unicode('', help='Optional codecs for the recording, e.g. "opus".').tag(sync=True)
def __init__(self, format='webm', filename=Recorder.filename.default_value, recording=False, autosave=False, **kwargs):
super(AudioRecorder, self).__init__(
format=format, filename=filename, recording=recording, autosave=autosave, **kwargs)
if 'audio' not in kwargs:
# Set up initial observer on child:
self.audio.observe(self._check_autosave, 'value')
@traitlets.default('audio')
def _default_audio(self):
return Audio(format=self.format, controls=True)
@observe('format')
def _update_audio_format(self, change):
self.audio.format = self.format
@observe('audio')
def _bind_audio(self, change):
if change.old:
change.old.unobserve(self._check_autosave, 'value')
change.new.observe(self._check_autosave, 'value')
def _check_autosave(self, change):
if len(self.audio.value) and self.autosave:
self.save()
[docs] def save(self, filename=None):
"""Save the audio to a file, if no filename is given it is based on the filename trait and the format.
>>> recorder = AudioRecorder(filename='test', format='mp3')
>>> ...
>>> recorder.save() # will save to test.mp3
>>> recorder.save('foo') # will save to foo.mp3
>>> recorder.save('foo.dat') # will save to foo.dat
"""
filename = filename or self.filename
if '.' not in filename:
filename += '.' + self.format
if len(self.audio.value) == 0:
raise ValueError('No data, did you record anything?')
with open(filename, 'wb') as f:
f.write(self.audio.value)
[docs]@register
class WebRTCPeer(DOMWidget):
"""A peer-to-peer webrtc connection"""
_model_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_name = Unicode('WebRTCPeerView').tag(sync=True)
_model_name = Unicode('WebRTCPeerModel').tag(sync=True)
_view_module_version = Unicode(semver_range_frontend).tag(sync=True)
_model_module_version = Unicode(semver_range_frontend).tag(sync=True)
stream_local = Instance(MediaStream, allow_none=True).tag(sync=True, **widget_serialization)
stream_remote = Instance(MediaStream, allow_none=True).tag(sync=True, **widget_serialization)
id_local = Unicode('').tag(sync=True)
id_remote = Unicode('').tag(sync=True)
connected = Bool(False, read_only=True).tag(sync=True)
failed = Bool(False, read_only=True).tag(sync=True)
[docs] def connect(self):
self.send({'msg': 'connect'})
[docs]@register
class WebRTCRoom(DOMWidget):
"""A 'chatroom', which consists of a list of :`WebRTCPeer` connections
"""
_model_module = Unicode('jupyter-webrtc').tag(sync=True)
_view_module = Unicode('jupyter-webrtc').tag(sync=True)
_model_name = Unicode('WebRTCRoomModel').tag(sync=True)
_view_module_version = Unicode(semver_range_frontend).tag(sync=True)
_model_module_version = Unicode(semver_range_frontend).tag(sync=True)
room = Unicode('room').tag(sync=True)
stream = Instance(MediaStream, allow_none=True).tag(sync=True, **widget_serialization)
room_id = Unicode(read_only=True).tag(sync=True)
nickname = Unicode('anonymous').tag(sync=True)
peers = List(Instance(WebRTCPeer), [], allow_none=False).tag(sync=True, **widget_serialization)
streams = List(Instance(MediaStream), [], allow_none=False).tag(sync=True, **widget_serialization)
[docs]@register
class WebRTCRoomLocal(WebRTCRoom):
_model_name = Unicode('WebRTCRoomLocalModel').tag(sync=True)
[docs]@register
class WebRTCRoomMqtt(WebRTCRoom):
"""Use a mqtt server to connect to other peers"""
_model_name = Unicode('WebRTCRoomMqttModel').tag(sync=True)
server = Unicode('wss://iot.eclipse.org:443/ws').tag(sync=True)
# add all help strings to the __doc__ for the api docstrings
for name, cls in list(vars().items()):
try:
if issubclass(cls, traitlets.HasTraits):
for trait_name, trait in cls.class_traits().items():
if 'help' in trait.metadata:
trait.__doc__ = trait.metadata['help']
except TypeError:
pass