-
Sistemas Distribuidos authored5b2f19ef
package main
import (
"context"
"encoding/json"
"log"
"net"
"fmt"
"time"
//"os"
"strconv"
"math/rand"
//"io/ioutil"
"google.golang.org/grpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc"
pb "gitlab.labcomp.cl/mnbarrio/e-library/proto"
)
const(
portcliente = ":50051"
)
//Estructuras
type Server struct{
pb.UnimplementedMensajeServer
pb.UnimplementedDataToDataServer
}
type propu struct{
Nombre string `json:"nombre"`
Port string `json:"puerto"`
Data []string `json:"datos"`
}
type toSend struct{
Nombre string `json:"nombre"`
Numero int64 `json:"numero"`
Data []byte `json:"datos"`
}
type Libro struct{
/*
nombre: String con el nombre del libro a subir
chunk: Slice que contiene la data ([]byte) del chunk i
*/
nombre string
datachunk []Chunk
}
type Chunk struct{
Numero int64
Data []byte
}
type ChunksInNode struct{
nombre string
datachunk []Chunk
}
//Variables Utiles
var (
propuestaTmp = make([][]byte,0)
ports = []string{"localhost:50051", "localhost:50052", "localhost:50053"}
nombreLibro string
//Lista con la propuesta a utilizar
propuestaSlc = make([]propu,0)
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
//slice libros entrantes
libros = make([]Libro,0)
//lista de chunks en ele nodo
chunks_in_node ChunksInNode
)
//Funciones
func cant_nodos(nodos []bool) int{
cn := 0
for i := 0; i<len(nodos); i++ {
if nodos[i]== true{
cn = cn + 1
}
}
return cn
}
func randNumber(node int) int{
seed := rand.NewSource(time.Now().UnixNano())
rs := rand.New(seed)
//cantidad de data nodes
return rs.Intn(node)
}
func GenerarPropuesta(nodos []bool, nodos_activos int, chunks int){
//Borrar data
propuestaTmp = propuestaTmp[0:0]
p := make([][]string,3)
for i := 0; i<chunks; i++ {
pos := randNumber(nodos_activos)
switch pos + 1 {
case 1:
p[0] = append(p[0], strconv.Itoa(i))
case 2:
p[1] = append(p[1], strconv.Itoa(i))
case 3:
p[2] = append(p[2], strconv.Itoa(i))
}
}
cont :=0
for i := 0; i<len(nodos); i++ {
if nodos[i] == true{
//hacer el json
emp := &propu{Nombre: libros[0].nombre ,Port: ports[i], Data:p[cont]}
e, err := json.Marshal(emp)
if err != nil {
fmt.Println(err)
return
}
propuestaTmp = append(propuestaTmp, e)
cont = cont + 1
}
}
fmt.Println("Propuesta generada")
}
//La función ClienteUploader hace un streaming client-side
func (s* Server) ClienteUploader (stream pb.Mensaje_ClienteUploaderServer) error {
log.Printf("Comenzando a subir el libro")
CantChunks := 0
var aux Libro
for{
CantChunks = CantChunks + 1
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
req, err := stream.Recv()
if err != nil{
log.Print(status.Errorf(codes.Unknown, "No se logró hacer streaming"))
return err
}
//Guardar en Memoria (Estructura)
var aux_chunk Chunk
aux.nombre = req.GetNombre()
aux_chunk.Numero = req.GetChunkNumb()
partBuffer := req.GetChunkData()
size := len(partBuffer)
aux_chunk.Data = partBuffer
aux.datachunk = append(aux.datachunk, aux_chunk)
if size != 256000 {
break
}
}
libros = append(libros, aux)
for j := 0; j<len(libros);j++{
fmt.Printf("\nSe ha recibido %d particiones de "+libros[j].nombre+"\n", len(libros[j].datachunk))
}
menu :=
`
[ 1 ] Centralizada
[ 2 ] Distribuida (No disponible)
¿Cómo desea que se distribuyan los datos? `
fmt.Printf(menu)
var eleccion int //Declarar variable y tipo antes de escanear, esto es obligatorio
fmt.Scanln(&eleccion)
switch eleccion{
case 1:
DC_Connect()
case 2:
fmt.Println("Esta opción no está disponible")
}
return nil
}
func ServerClientConnect(){
//conectar con cliente uploader
fmt.Printf("Iniciando Data Node #1 como Server \n")
lis, err := net.Listen("tcp", portcliente)
if err != nil {
log.Fatalf("Error conectando con el puerto (err): %v", err)
}
fmt.Printf("Conectando con el servidor ..." + portcliente+ "\n")
s := grpc.NewServer()
pb.RegisterMensajeServer(s, &Server{})
if err := s.Serve(lis); err!= nil {
log.Fatalf("Error al servir al cliente (err): %v", err)
}
}
func ServerDatanodetoDatanode(port string){
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("Error conectando con el puerto (err): %v", err)
}
fmt.Printf("Conectando con el servidor ..." + port+ "\n")